-
Notifications
You must be signed in to change notification settings - Fork 0
feat: bigquery catalog with iceberg support #393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes update dependency management and build configurations across multiple modules. A new variable Changes
Sequence Diagram(s)sequenceDiagram
participant C as Client
participant D as DelegatingCatalog
participant I as IcebergCatalog
participant B as BigQueryCatalog
C->>D: loadTable(ident)
D->>I: try loadTable(ident)
alt Iceberg success
I-->>D: Table
D-->>C: Return Table
else Failure
D->>B: loadTable(ident)
B-->>D: Table
D-->>C: Return Table
end
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
fb95960
to
832b540
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryMetastoreCatalog.scala (1)
8-16
: Add error handling.Basic initialization looks good, but needs error handling.
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { + try { super.initialize(name, options) val vanillaIcebergCatalog = new SparkCatalog() vanillaIcebergCatalog.initialize(name, options) setDelegateCatalog(vanillaIcebergCatalog) + } catch { + case e: Exception => + throw new RuntimeException(s"Failed to initialize BigQuery catalog: ${e.getMessage}", e) + } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
⛔ Files ignored due to path filters (1)
cloud_gcp/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar
is excluded by!**/*.jar
📒 Files selected for processing (6)
catalog/BUILD.bazel
(1 hunks)catalog/src/main/scala/ai/chronon/catalog/Catalog.scala
(1 hunks)cloud_gcp/BUILD.bazel
(4 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryMetastoreCatalog.scala
(1 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(3 hunks)tools/build_rules/dependencies/maven_repository.bzl
(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:29-60
Timestamp: 2025-01-24T23:55:30.256Z
Learning: In BigQuery integration, table existence check is performed outside the BigQueryFormat.createTable method, at a higher level in TableUtils.createTable.
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: bazel_config_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)
3-21
: LGTM!Imports are well-organized and include necessary BigQuery and GCS dependencies.
101-113
: LGTM!Well-documented reason for ignoring the test with clear setup instructions.
tools/build_rules/dependencies/maven_repository.bzl (1)
63-63
:✅ Verification successful
Verify Iceberg version compatibility with Spark 3.5.
🌐 Web query:
What is the latest version of Apache Iceberg that is compatible with Spark 3.5?
💡 Result:
The latest version of Apache Iceberg compatible with Spark 3.5 is 1.8.0, as shown by:
- The official Apache Iceberg releases page listing 1.8.0 as the current version[1]
- Specific Spark 3.5 runtime JARs (for both Scala 2.12 and 2.13) included in the Iceberg 1.8.0 release artifacts[1][8]
- Documentation examples explicitly using
iceberg-spark-runtime-3.5_2.12:1.8.0
for integration[2]This compatibility allows users to run commands like:
spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.Earlier versions like 1.6.x and 1. also supported Spark 3.**[9], but 1. represents the current latest release as of February **[current date].
Citations:
- 1: https://iceberg.apache.org/releases/
- 2: https://iceberg.apache.org/docs/1.6.1/spark-getting-started/
- 3: https://iceberg.apache.org/docs/nightly/spark-configuration/
- 4: https://community.databricks.com/t5/data-engineering/reading-an-iceberg-table-with-aws-glue-data-catalog-as-metastore/td-p/101142
- 5: https://github.com/apache/iceberg/releases
- 6: https://aws.amazon.com/blogs/big-data/amazon-emr-7-1-runtime-for-apache-spark-and-iceberg-can-run-spark-workloads-2-7-times-faster-than-apache-spark-3-5-1-and-iceberg-1-5-2/
- 7: https://github.com/apache/iceberg/actions/runs/6209553152/workflow
- 8: https://iceberg.apache.org/multi-engine-support/
- 9: https://aws.amazon.com/blogs/big-data/amazon-emr-7-5-runtime-for-apache-spark-and-iceberg-can-run-spark-workloads-3-6-times-faster-than-spark-3-5-3-and-iceberg-1-6-1/
- 10: https://projectnessie.org/iceberg/spark/
Apache Iceberg 1.8.0 is verified compatible with Spark 3.5.
- Confirmed per Apache releases and documentation.
- No update is required.
catalog/BUILD.bazel
Outdated
deps = [ | ||
], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add required dependencies.
Missing dependencies for Spark/Iceberg catalog functionality.
Add these dependencies:
deps = [
+ maven_scala_artifact("org.apache.spark:spark-sql"),
+ maven_scala_artifact("org.apache.iceberg:iceberg-spark-runtime-3.5"),
],
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
deps = [ | |
], | |
deps = [ | |
maven_scala_artifact("org.apache.spark:spark-sql"), | |
maven_scala_artifact("org.apache.iceberg:iceberg-spark-runtime-3.5"), | |
], |
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
Outdated
Show resolved
Hide resolved
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
Outdated
Show resolved
Hide resolved
832b540
to
1a8d3e7
Compare
8b2e761
to
2889108
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1)
82-177
: Potential overhead from registering many classes.
Remove any unneeded classes to keep overhead low.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala
(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#51
File: spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala:192-200
Timestamp: 2024-11-26T19:47:53.900Z
Learning: Only suggest registering Delta Lake action classes for serialization if they are actually used in the codebase.
🔇 Additional comments (1)
spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala (1)
166-166
: Consistent config application.
Re-adding merged configs at this point looks good.
8de6087
to
71c07dc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)
39-41
:⚠️ Potential issueEmpty configuration values need to be set.
The following BigQuery metastore catalog settings are empty:
warehouse
gcp_location
gcp_project
🧹 Nitpick comments (5)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)
96-97
: Single URI requirement
Requiring only one URI may limit multi-file external tables.
137-139
: Potential concurrency issue
Storing delegate in a mutable var might cause race conditions in multi-threaded contexts.cloud_gcp/BUILD.bazel (1)
89-89
: Encourage added tests
Consider dedicated tests for BigQuery Catalog integration changes.cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala (1)
123-141
: Remove commented code.The old external table implementation is no longer needed.
-// val baseTableDef = ExternalTableDefinition -// .newBuilder(dataGlob, FormatOptions.parquet()) -// .setAutodetect(true) -// -// if (partitionColumns.nonEmpty) { -// val timePartitioning = HivePartitioningOptions -// .newBuilder() -// .setFields(partitionColumns.toJava) -// .setSourceUriPrefix(path) -// .setMode("STRINGS") -// .build() -// baseTableDef.setHivePartitioningOptions(timePartitioning) -// } -// -// val tableInfo = TableInfo.newBuilder(shadedTableId, baseTableDef.build).build() -// val createdTable = bigQueryClient.create(tableInfo) -// -// println(s"Created external table ${createdTable.getTableId}") -//spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
136-139
: Consider performance implications of SQL-based loading.The switch to SQL-based table loading might have performance implications compared to direct loading.
Also applies to: 244-246, 745-747
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (9)
.github/workflows/require_triggered_status_checks.yaml
(1 hunks)cloud_gcp/BUILD.bazel
(6 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala
(2 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(3 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(0 hunks)spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(5 hunks)tools/build_rules/dependencies/maven_repository.bzl
(3 hunks)tools/build_rules/dependencies/spark_repository.bzl
(1 hunks)
💤 Files with no reviewable changes (1)
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
🧰 Additional context used
🧠 Learnings (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala (2)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:29-60
Timestamp: 2025-01-24T23:55:30.256Z
Learning: In BigQuery integration, table existence check is performed outside the BigQueryFormat.createTable method, at a higher level in TableUtils.createTable.
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57
Timestamp: 2025-01-24T23:55:40.650Z
Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:29-60
Timestamp: 2025-01-24T23:55:30.256Z
Learning: In BigQuery integration, table existence check is performed outside the BigQueryFormat.createTable method, at a higher level in TableUtils.createTable.
cloud_gcp/BUILD.bazel (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
⏰ Context from checks skipped due to timeout of 90000ms (11)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: fetcher_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: spark_tests
🔇 Additional comments (13)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)
21-27
: Doc order differs from code
Doc suggests BigQuery→Iceberg→External, but code attempts Iceberg first.
80-110
: Fallback handling
Ensure all exceptions are properly handled before calling defaultSessionCatalog.tools/build_rules/dependencies/spark_repository.bzl (1)
38-39
: Check excluded artifacts
Excluding Hadoop artifacts might cause missing classes if indirectly needed.cloud_gcp/BUILD.bazel (2)
1-39
: Version alignment check
Confirm these artifacts align correctly with Spark 3.5 and Hadoop components.
77-79
: Local jar usage
No action needed since this is intentionally a local jar.tools/build_rules/dependencies/maven_repository.bzl (2)
46-46
: LGTM! Added Iceberg Spark runtime dependency.The addition of Iceberg Spark runtime dependency aligns with the PR objective of introducing Iceberg BigQuery metastore catalog.
88-91
:❓ Verification inconclusive
Verify compatibility of BigQuery connector versions.
The downgrade of BigQuery connector dependencies from 3.0.3 to 2.2.26 needs verification.
🌐 Web query:
What is the latest stable version of Google Cloud BigQuery connector for Hadoop?
💡 Result:
The latest stable version of Google Cloud's BigQuery connector for Hadoop appears to be 3.0.4, released on November 12, 2024[8]. This update includes fixes for downscoping issues with gRPC and improvements for hierarchical namespace bucket operations[8].
For context:
- Earlier releases (like 2.2.x series) addressed compatibility with different Hadoop versions[8]
- The Spark BigQuery Connector (a related but separate component) reached 0.42.0 in January 2025 with Spark 3.5 support[3][7]
- Maven repositories list older connector versions like 0.13.4-hadoop3 and hadoop3-1.2.0, but these are superseded by newer releases[9][10]
Key capabilities in recent versions include direct BigQuery Storage API integration (avoiding intermediate GCS files)[5], predicate/column filtering optimizations[3], and improved authentication handling[1][8].
Citations:
- 1: https://aws.amazon.com/marketplace/pp/prodview-sqnd4gn5fykx6
- 2: https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage
- 3: https://github.com/GoogleCloudDataproc/spark-bigquery-connector
- 4: https://www.eweek.com/cloud/google-cloud-unveils-hadoop-connectors-new-app-engine-version/
- 5: https://cloud.google.com/dataproc/docs/concepts/connectors/bigquery
- 6: https://pekko.apache.org/docs/pekko-connectors/1.0.1/google-cloud-bigquery.html
- 7: https://cloud.google.com/bigquery/docs/release-notes
- 8: https://github.com/GoogleCloudDataproc/hadoop-connectors/releases
- 9: https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/bigquery-connector/0.7.3-hadoop2
- 10: https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/bigquery-connector/hadoop3-1.2.0
Action: Verify BigQuery connector dependency versions
- In
tools/build_rules/dependencies/maven_repository.bzl
(lines 88-91), dependencies were set to 2.2.26.- The latest stable BigQuery connector for Hadoop is 3.0.4.
- Please confirm if the downgrade is intentional and compatible or if an update to 3.0.4 is needed.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala (2)
5-5
: LGTM! Updated import for table creation status.Import change aligns with the new Iceberg-based table creation approach.
110-123
: LGTM! Implemented Iceberg table creation.The new implementation correctly uses Iceberg format with proper partitioning support.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)
114-117
: LGTM! Added catalog type verification test.Test verifies correct catalog implementation.
123-149
: LGTM! Added Kryo serialization test.Comprehensive test for ResolvingFileIO serialization.
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
124-133
: LGTM! Improved error handling.Using Try for better error handling in tableReachable.
.github/workflows/require_triggered_status_checks.yaml (1)
4-4
: Push trigger added.
Concise and clear; expands trigger coverage as needed.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
Outdated
Show resolved
Hide resolved
6c3b9a7
to
a91c224
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (2)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)
112-138
: Add proper resource cleanup.Use try-with-resources to ensure proper cleanup of streams.
it should "kryo serialization for ResolvingFileIO" in { val registrator = new ChrononKryoRegistrator() val kryo = new Kryo(); kryo.setRegistrationRequired(true); kryo.setReferences(true); registrator.registerClasses(kryo) // Create an instance of ResolvingFileIO val original = new ResolvingFileIO(); original.initialize(Map.empty[String, String].asJava) // Serialize the object - val outputStream = new ByteArrayOutputStream(); - val output = new Output(outputStream); - kryo.writeClassAndObject(output, original); - output.close(); + val outputStream = new ByteArrayOutputStream() + try { + val output = new Output(outputStream) + try { + kryo.writeClassAndObject(output, original) + } finally { + output.close() + } // Deserialize the object - val inputStream = new ByteArrayInputStream(outputStream.toByteArray()); - val input = new Input(inputStream); - val deserializedObj = kryo.readClassAndObject(input); - input.close(); + val inputStream = new ByteArrayInputStream(outputStream.toByteArray()) + val input = new Input(inputStream) + try { + val deserializedObj = kryo.readClassAndObject(input) + assertNotNull("Deserialized object should not be null", deserializedObj) + assertTrue("Deserialized object should be an instance of ResolvingFileIO", + deserializedObj.isInstanceOf[ResolvingFileIO]) + } finally { + input.close() + } + } finally { + outputStream.close() + } - assertNotNull("Deserialized object should not be null", deserializedObj); - assertTrue("Deserialized object should be an instance of ResolvingFileIO", - deserializedObj.isInstanceOf[ResolvingFileIO]); }cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1)
28-38
: Track the TODO for catalog delegation.The comment indicates future work needed for proper catalog delegation.
Would you like me to create an issue to track this TODO?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (13)
.github/workflows/require_triggered_status_checks.yaml
(1 hunks)cloud_gcp/BUILD.bazel
(6 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala
(2 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(2 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(0 hunks)spark/BUILD.bazel
(1 hunks)spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(4 hunks)spark_install.json
(12 hunks)tools/build_rules/dependencies/maven_repository.bzl
(4 hunks)tools/build_rules/dependencies/spark_repository.bzl
(1 hunks)
🔥 Files not summarized due to errors (1)
- spark_install.json: Error: Server error: no LLM provider could handle the message
💤 Files with no reviewable changes (1)
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
🚧 Files skipped from review as they are similar to previous changes (5)
- spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala
- .github/workflows/require_triggered_status_checks.yaml
- tools/build_rules/dependencies/spark_repository.bzl
- cloud_gcp/BUILD.bazel
- tools/build_rules/dependencies/maven_repository.bzl
🧰 Additional context used
🧠 Learnings (2)
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#51
File: spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala:192-200
Timestamp: 2024-11-26T19:47:53.900Z
Learning: Only suggest registering Delta Lake action classes for serialization if they are actually used in the codebase.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala (2)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:29-60
Timestamp: 2025-01-24T23:55:30.256Z
Learning: In BigQuery integration, table existence check is performed outside the BigQueryFormat.createTable method, at a higher level in TableUtils.createTable.
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57
Timestamp: 2025-01-24T23:55:40.650Z
Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.
⏰ Context from checks skipped due to timeout of 90000ms (17)
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: non_spark_tests
- GitHub Check: join_tests
- GitHub Check: non_spark_tests
- GitHub Check: bazel_config_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (25)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)
3-19
: LGTM!Imports are well-organized and necessary for the new functionality.
39-41
: Empty configuration values need to be set.The following BigQuery metastore catalog settings are empty:
warehouse
gcp_location
gcp_project
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala (2)
110-123
: LGTM! Clean transition to Iceberg tables.The implementation correctly handles table creation and partitioning using Iceberg format.
12-12
: LGTM! Required imports added.The new imports support the Iceberg table implementation.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)
39-58
: LGTM! Well-structured table delegation.Clean implementation that properly delegates operations while preserving additional properties.
94-132
: LGTM! Robust catalog implementation with proper fallback.The loadTable method gracefully handles failures and falls back to BigQuery when needed.
spark/BUILD.bazel (1)
48-48
: LGTM! Consistent dependency management.Changed to maven_artifact to align with project's dependency resolution approach.
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (3)
124-133
: LGTM! Improved error handling pattern.Using Try-match is more idiomatic Scala and improves readability.
136-139
: LGTM! Simplified table access.Direct SQL queries provide cleaner access to table data and schema.
Also applies to: 243-246
745-746
: LGTM! Consistent table access pattern.Using SQL query aligns with the changes in loadTable and getSchemaFromTable.
spark_install.json (12)
1-7
: Hashes updated.
Auto-gen hash fields; verify new values.
1022-1031
: Hadoop Auth update.
Hashes and version for hadoop-auth updated.
2381-2388
: Curator-client deps.
Added zookeeper and slf4j entries; check necessity.
2431-2443
: Yarn-common deps.
New guice and jersey modules added; looks good.
2448-2456
: Jackson modules.
Adjusted jackson dependencies; approved.
2560-2564
: Hive shims.
Added hive-shims-common; concise update.
2571-2577
: Hive scheduler.
Included hive-shims-common in scheduler deps.
3072-3079
: libthrift deps.
Added httpcore and slf4j; integration looks fine.
3111-3118
: Zookeeper deps.
Added slf4j entries; ensure consistency.
8500-8506
: Jasper runtime.
Reordered/dependency update; simple change.
9073-9079
: Tomcat runtime.
Updated jasper-runtime dependency.
10149-10155
: Jasper-runtime sources.
Source ref updated; approved.spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (3)
118-119
: LGTM! Iceberg classes added for BigQuery metastore.The addition of Iceberg classes aligns with the PR objective of introducing Iceberg BigQuery metastore catalog.
99-102
: LGTM! Java util classes added.Common Java collection classes added for serialization optimization.
107-117
:❓ Verification inconclusive
Verify Hadoop FS classes usage.
Several Hadoop FS classes are registered. Ensure they are required for the Iceberg integration.
Also applies to: 166-167
🏁 Script executed:
#!/bin/bash # Search for usage of Hadoop FS classes rg "FileUtil|BlockLocation|FileStatus|Path|StorageType|FsAction|FsPermission" --type scalaLength of output: 35663
Hadoop FS Registrations & Iceberg Integration Verification
- In ChrononKryoRegistrator.scala (lines 107–117 & 166–167), several Hadoop FS classes (e.g., BlockLocation, FileStatus, various FileUtil variants, etc.) are registered.
- Grep results don’t reveal direct usage tied to Iceberg integration—usage seems isolated to this registrator.
- Action: Please double-check if these registrations are essential for Iceberg support or if they can be removed.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
Outdated
Show resolved
Hide resolved
"org.apache.spark.sql.types.Metadata", | ||
"ai.chronon.api.Row", | ||
"ai.chronon.spark.KeyWithHash", | ||
"ai.chronon.aggregator.base.ApproxHistogramIr", | ||
"ai.chronon.aggregator.base.MomentsIR", | ||
"ai.chronon.aggregator.windowing.BatchIr", | ||
"ai.chronon.aggregator.base.ApproxHistogramIr", | ||
"ai.chronon.online.RowWrapper", | ||
"ai.chronon.online.fetcher.Fetcher$Request", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove duplicate entry.
ai.chronon.online.fetcher.Fetcher$Request
and ai.chronon.online.Fetcher$Request
refer to the same class.
- "ai.chronon.online.fetcher.Fetcher$Request",
"ai.chronon.aggregator.windowing.FinalBatchIr",
"ai.chronon.api.Row",
- "ai.chronon.online.Fetcher$Request",
Also applies to: 90-90
"org.apache.spark.sql.types.Metadata", | ||
"ai.chronon.api.Row", | ||
"ai.chronon.spark.KeyWithHash", | ||
"ai.chronon.aggregator.base.ApproxHistogramIr", | ||
"ai.chronon.aggregator.base.MomentsIR", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove duplicate entry.
ai.chronon.aggregator.base.ApproxHistogramIr
is registered twice.
- "ai.chronon.aggregator.base.ApproxHistogramIr",
"ai.chronon.aggregator.base.MomentsIR",
"ai.chronon.aggregator.windowing.BatchIr",
- "ai.chronon.aggregator.base.ApproxHistogramIr",
Also applies to: 85-85
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (6)
cloud_gcp/BUILD.bazel
(3 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(2 hunks)spark/BUILD.bazel
(2 hunks)spark_install.json
(10 hunks)tools/build_rules/dependencies/maven_repository.bzl
(4 hunks)tools/build_rules/dependencies/spark_repository.bzl
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- tools/build_rules/dependencies/spark_repository.bzl
- spark/BUILD.bazel
- tools/build_rules/dependencies/maven_repository.bzl
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/BUILD.bazel (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
⏰ Context from checks skipped due to timeout of 90000ms (15)
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: join_tests
- GitHub Check: analyzer_tests
- GitHub Check: fetcher_tests
- GitHub Check: groupby_tests
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (16)
cloud_gcp/BUILD.bazel (4)
1-39
: LGTM! Dependencies consolidated effectively.The shared_deps variable improves maintainability by centralizing dependency management.
49-56
: LGTM! Binary configuration looks good.The runtime dependencies are properly specified.
77-81
: LGTM! Local jar configuration is correct.Based on the past review comments and learnings, this is intentionally a local jar dependency.
41-47
:❓ Verification inconclusive
Verify format=True impact.
Ensure the format flag doesn't conflict with existing formatters.
🏁 Script executed:
#!/bin/bash # Check for other formatting tools in the project fd -t f '\.scalafmt\.conf|\.scalafix\.conf|\.scalastyle|\.scalafmt'Length of output: 69
Action: Verify Scala formatter config absence manually.
- In
cloud_gcp/BUILD.bazel
(lines 41–47),format=True
is set.- The search script showed no formatter configurations (e.g.,
.scalafmt.conf
,.scalafix.conf
,.scalastyle
), but please manually verify their absence to ensure no conflicts.cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)
3-19
: LGTM! Imports are well organized.The new imports support the added Kryo serialization test.
112-138
: LGTM! Kryo serialization test is thorough.The test properly validates serialization and deserialization of ResolvingFileIO.
spark_install.json (10)
3-4
: Hash update validation.
Artifact hash values updated to positive integers. Verify these values align with the new resolution process.
2308-2329
: Dependency adjustments.
Added"org.slf4j:slf4j-api"
incurator-client
and"io.netty:netty"
forhadoop-mapreduce-client-core
. Confirm these align with your dependency strategy.
2456-2465
: Hive shim update.
Included"commons-lang:commons-lang"
inhive-shims-common
and linkedhive-shims-scheduler
to it. Looks correct.
2515-2522
: OpenCSV dependency added.
"net.sf.opencsv:opencsv"
now appears underhive-serde
.
2529-2535
: Hive shims scheduler inclusion.
"org.apache.hive.shims:hive-shims-scheduler"
has been added to thehive-shims
list.
2948-2954
: Libthrift enhancements.
Added"org.apache.httpcomponents:httpcore"
and ensured"org.slf4j:slf4j-api"
is present. Verify versions and compatibility.
2972-2978
: Twill discovery update.
Added"org.apache.twill:twill-discovery-api"
undertwill-discovery-core
.
8139-8145
: Jasper runtime update.
Updated"tomcat:jasper-runtime"
; ensure consistency with related dependency listings.
8692-8698
: Jasper runtime consistency.
Reaffirms the"tomcat:jasper-runtime"
entry.
9768-9774
: Jasper runtime JAR source.
Updated"tomcat:jasper-runtime:jar:sources"
. Looks good.
"spark.sql.catalog.spark_catalog.catalog-impl" -> classOf[BQMSCatalog].getName, | ||
"spark.sql.catalog.spark_catalog" -> classOf[DelegatingBigQueryMetastoreCatalog].getName, | ||
"spark.sql.catalog.spark_catalog.io-impl" -> classOf[ResolvingFileIO].getName, | ||
"spark.sql.defaultUrlStreamHandlerFactory.enabled" -> false.toString, | ||
// set the following | ||
"spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/", | ||
"spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1", | ||
"spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Hardcoded configuration values should be externalized.
Move warehouse, location, and project values to configuration files.
- "spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/",
- "spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1",
- "spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022",
+ "spark.sql.catalog.spark_catalog.warehouse" -> sys.env.getOrElse("WAREHOUSE_PATH", ""),
+ "spark.sql.catalog.spark_catalog.gcp_location" -> sys.env.getOrElse("GCP_LOCATION", ""),
+ "spark.sql.catalog.spark_catalog.gcp_project" -> sys.env.getOrElse("GCP_PROJECT", ""),
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
"spark.sql.catalog.spark_catalog.catalog-impl" -> classOf[BQMSCatalog].getName, | |
"spark.sql.catalog.spark_catalog" -> classOf[DelegatingBigQueryMetastoreCatalog].getName, | |
"spark.sql.catalog.spark_catalog.io-impl" -> classOf[ResolvingFileIO].getName, | |
"spark.sql.defaultUrlStreamHandlerFactory.enabled" -> false.toString, | |
// set the following | |
"spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/", | |
"spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1", | |
"spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022", | |
"spark.sql.catalog.spark_catalog.catalog-impl" -> classOf[BQMSCatalog].getName, | |
"spark.sql.catalog.spark_catalog" -> classOf[DelegatingBigQueryMetastoreCatalog].getName, | |
"spark.sql.catalog.spark_catalog.io-impl" -> classOf[ResolvingFileIO].getName, | |
"spark.sql.defaultUrlStreamHandlerFactory.enabled" -> false.toString, | |
// set the following | |
"spark.sql.catalog.spark_catalog.warehouse" -> sys.env.getOrElse("WAREHOUSE_PATH", ""), | |
"spark.sql.catalog.spark_catalog.gcp_location" -> sys.env.getOrElse("GCP_LOCATION", ""), | |
"spark.sql.catalog.spark_catalog.gcp_project" -> sys.env.getOrElse("GCP_PROJECT", ""), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (3)
94-132
: Simplify error handling in loadTable.The nested match expressions and error handling make the code hard to follow.
override def loadTable(ident: Identifier): Table = { - Try { icebergCatalog.loadTable(ident) } - .recover { - case _ => { - val tId = ident.namespace().toList match { + def loadBigQueryTable: Table = { + val tId = ident.namespace().toList match { case database :: Nil => TableId.of(database, ident.name()) case project :: database :: Nil => TableId.of(project, database, ident.name()) - } - val table = bigQueryClient.getTable(tId) + case _ => throw new IllegalArgumentException(s"Invalid namespace: ${ident.namespace().mkString(".")}") + } + val table = bigQueryClient.getTable(tId) + table.getDefinition match { + case externalTable: ExternalTableDefinition => loadExternalTable(tId, externalTable) + case _: StandardTableDefinition => loadStandardTable(ident) + case _ => throw new IllegalStateException(s"Unsupported table type: ${table.getFriendlyName}") + } + } + + Try(icebergCatalog.loadTable(ident)) + .orElse(Try(loadBigQueryTable)) + .getOrElse(defaultSessionCatalog.asInstanceOf[TableCatalog].loadTable(ident)) + }
104-122
: Improve URI handling for external tables.The URI handling is fragile and could fail with different URI patterns.
- val uri = scala - .Option(externalTable.getHivePartitioningOptions) - .map(_.getSourceUriPrefix) - .getOrElse { - require(uris.size == 1, s"External table ${table} can be backed by only one URI.") - uris.head.replaceAll("/\\*\\.parquet$", "") - } + val uri = scala.Option(externalTable.getHivePartitioningOptions) + .map(_.getSourceUriPrefix) + .getOrElse { + require(uris.nonEmpty, s"External table ${table} must have at least one URI") + val baseUri = uris.head.replaceAll("/\\*\\.parquet$", "") + require(uris.tail.forall(_.startsWith(baseUri)), + s"All URIs must share the same prefix: $baseUri") + baseUri + }
28-30
: Address TODO comment.The TODO comment indicates missing functionality for proper catalog delegation.
Would you like me to help create unit tests or implement the chronon catalog delegation?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(4 hunks)tools/build_rules/dependencies/spark_repository.bzl
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- tools/build_rules/dependencies/spark_repository.bzl
- spark/src/main/scala/ai/chronon/spark/TableUtils.scala
🔇 Additional comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)
1-27
: LGTM!Imports are well-organized and comprehensive.
60-63
: LGTM!Clean factory method implementation.
...cp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
cloud_gcp/BUILD.bazel (1)
79-83
: Consider documenting the local jar dependency.Add a comment explaining why this is a local jar.
java_import( name = "iceberg_bigquery_catalog_lib", + # Local jar dependency - custom build for BigQuery catalog integration jars = ["iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar"], visibility = ["//visibility:public"], )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (5)
cloud_gcp/BUILD.bazel
(3 hunks)cloud_gcp/src/main/resources/services/org.apache.spark.sql.sources.DataSourceRegister
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
(3 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/RenamedBigQueryRelationProvider.scala
(1 hunks)tools/build_rules/dependencies/maven_repository.bzl
(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
- tools/build_rules/dependencies/maven_repository.bzl
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/BUILD.bazel (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: non_spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: bazel_config_tests
- GitHub Check: non_spark_tests
🔇 Additional comments (4)
cloud_gcp/src/main/resources/services/org.apache.spark.sql.sources.DataSourceRegister (1)
1-1
: LGTM!Correctly registers the BigQuery data source provider.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/RenamedBigQueryRelationProvider.scala (1)
5-7
:❓ Verification inconclusive
Verify usage of renamed provider.
Implementation looks good. Ensure consumers are updated to use "bigquery-renamed".
🏁 Script executed:
#!/bin/bash # Search for usage of the renamed provider rg "bigquery-renamed" --type scalaLength of output: 187
Ensure consumer updates are verified.
The class implementation is correct. However, note that our search only shows the "bigquery-renamed" string in this file. Please manually verify that all consumers reference the new name.cloud_gcp/BUILD.bazel (2)
1-40
: LGTM! Dependencies consolidated effectively.Clean organization of shared dependencies.
42-49
: LGTM! Library configuration is correct.Resources glob pattern will include DataSourceRegister file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1)
42-46
:⚠️ Potential issueAdd type checking before casting.
Previous review comment still applies.
- override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = - internalTable.asInstanceOf[SupportsRead].newScanBuilder(options) + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + require(internalTable.isInstanceOf[SupportsRead], "Table does not support reading") + internalTable.asInstanceOf[SupportsRead].newScanBuilder(options) + } - override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = - internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info) + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { + require(internalTable.isInstanceOf[SupportsWrite], "Table does not support writing") + internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info) + }
🧹 Nitpick comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (3)
97-104
: Simplify URI extraction logic.Extract URI handling into a separate method for better readability.
+ private def extractUri(externalTable: ExternalTableDefinition): String = { + val uris = externalTable.getSourceUris.asScala + scala.Option(externalTable.getHivePartitioningOptions) + .map(_.getSourceUriPrefix) + .getOrElse { + require(uris.size == 1, s"External table can be backed by only one URI.") + uris.head.replaceAll("/\\*\\.parquet$", "") + } + }
86-124
: Improve error handling in loadTable.Add specific error types and logging.
- Try { icebergCatalog.loadTable(ident) } + Try(icebergCatalog.loadTable(ident)) + .recoverWith { + case e: IllegalArgumentException => + // Log specific error + loadBigQueryTable(ident) + case e: Exception => + // Log unexpected error + Try(loadBigQueryTable(ident)) + }
59-64
: Consider making clients configurable.Allow injection of BigQuery and Iceberg clients for testing.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_gcp/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (17)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: spark_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: non_spark_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: non_spark_tests
- GitHub Check: bazel_config_tests
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
cloud_gcp/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister (1)
1-1
: LGTM!Correctly registers the BigQuery relation provider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)
85-123
: Simplify loadTable recovery logic.The recovery path is complex and could be extracted into a separate method for better readability.
override def loadTable(ident: Identifier): Table = { Try { icebergCatalog.loadTable(ident) } - .recover { - case _ => { - val tId = ident.namespace().toList match { - case database :: Nil => TableId.of(database, ident.name()) - case project :: database :: Nil => TableId.of(project, database, ident.name()) - } - val table = bigQueryClient.getTable(tId) - table.getDefinition.asInstanceOf[TableDefinition] match { - case externalTable: ExternalTableDefinition => { - // ... external table handling - } - case _: StandardTableDefinition => { - // ... standard table handling - } - case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}") - } - } - } + .recover { case _ => loadBigQueryTable(ident) } .getOrElse(defaultSessionCatalog.asInstanceOf[TableCatalog].loadTable(ident)) } + +private def loadBigQueryTable(ident: Identifier): Table = { + val tId = createTableId(ident) + val table = bigQueryClient.getTable(tId) + table.getDefinition.asInstanceOf[TableDefinition] match { + case externalTable: ExternalTableDefinition => handleExternalTable(externalTable, tId) + case _: StandardTableDefinition => handleStandardTable(ident) + case _ => throw new UnsupportedOperationException( + s"Table type '${table.getDefinition.getType}' is not supported for table: ${table.getFriendlyName}") + } +}
118-118
: Enhance error message.Make the error message more descriptive by including the actual table type.
- case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}") + case _ => throw new UnsupportedOperationException( + s"Table type '${table.getDefinition.getType}' is not supported for table: ${table.getFriendlyName}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: non_spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: fetcher_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: groupby_tests
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)
41-45
: Add type checking before casting.The unsafe casts could fail at runtime if the internal table doesn't support read/write operations.
51-54
: LGTM!Clean factory method implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)
41-43
: 🛠️ Refactor suggestionExternalize configuration values.
Move hardcoded warehouse path, location, and project values to configuration files.
- "spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/", - "spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1", - "spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022", + "spark.sql.catalog.spark_catalog.warehouse" -> sys.env.getOrElse("WAREHOUSE_PATH", ""), + "spark.sql.catalog.spark_catalog.gcp_location" -> sys.env.getOrElse("GCP_LOCATION", ""), + "spark.sql.catalog.spark_catalog.gcp_project" -> sys.env.getOrElse("GCP_PROJECT", ""),
🧹 Nitpick comments (2)
cloud_gcp/BUILD.bazel (1)
1-40
: Consider organizing dependencies by category.Group dependencies into categories (e.g., core, testing, logging) for better maintainability.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)
75-84
: Document test requirements.Add clear documentation for the ignored integration tests explaining how to run them locally.
Also applies to: 86-98, 100-112
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_gcp/BUILD.bazel
(2 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(2 hunks)
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/BUILD.bazel (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: non_spark_tests
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: bazel_config_tests
🔇 Additional comments (6)
cloud_gcp/BUILD.bazel (4)
42-49
: LGTM! Good use of shared dependencies.The scala_library definition is well-structured with proper visibility and resource handling.
51-58
: LGTM! Runtime dependencies are properly configured.The jvm_binary configuration correctly includes necessary runtime dependencies.
79-83
: Local jar dependency noted.Based on previous feedback, this is intentionally using a local jar file.
85-93
: LGTM! Test configuration is comprehensive.Test suite properly includes shared dependencies and test-specific dependencies.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)
3-17
: LGTM! Well-organized imports.Imports are logically grouped and necessary for the functionality.
118-144
: LGTM! Comprehensive Kryo serialization test.The test thoroughly validates serialization and deserialization of ResolvingFileIO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)
113-139
: Add state verification for deserialized object.Add assertions to verify the internal state of the deserialized ResolvingFileIO matches the original.
assertTrue("Deserialized object should be an instance of ResolvingFileIO", deserializedObj.isInstanceOf[ResolvingFileIO]); + val deserialized = deserializedObj.asInstanceOf[ResolvingFileIO] + assertEquals("Deserialized object should have same configuration", + original.properties(), deserialized.properties())
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_gcp/BUILD.bazel
(2 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- cloud_gcp/BUILD.bazel
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: bazel_config_tests
🔇 Additional comments (2)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)
40-42
: Externalize configuration values.Hardcoded GCP configuration values should be moved to environment variables.
- "spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/", - "spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1", - "spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022", + "spark.sql.catalog.spark_catalog.warehouse" -> sys.env.getOrElse("WAREHOUSE_PATH", ""), + "spark.sql.catalog.spark_catalog.gcp_location" -> sys.env.getOrElse("GCP_LOCATION", ""), + "spark.sql.catalog.spark_catalog.gcp_project" -> sys.env.getOrElse("GCP_PROJECT", ""),
48-52
: LGTM! Test cases properly verify runtime classes and GCP providers.Also applies to: 54-59
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
cloud_gcp/BUILD.bazel (2)
1-41
: Consider organizing shared_deps by purpose.Group related dependencies (e.g., Google Cloud, Scala, Logging) for better maintainability.
38-40
: Remove commented code.Delete the commented spark-bigquery-with-dependencies line.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
cloud_gcp/BUILD.bazel
(2 hunks)tools/build_rules/dependencies/scala_2_12_repository.bzl
(1 hunks)tools/build_rules/dependencies/scala_2_13_repository.bzl
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- tools/build_rules/dependencies/scala_2_12_repository.bzl
- tools/build_rules/dependencies/scala_2_13_repository.bzl
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/BUILD.bazel (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: fetcher_tests
- GitHub Check: non_spark_tests
- GitHub Check: bazel_config_tests
🔇 Additional comments (4)
cloud_gcp/BUILD.bazel (4)
43-50
: LGTM! Good use of shared_deps.Clean configuration with proper visibility and resource handling.
80-84
: Using local jar as previously discussed.Based on past feedback, this is intentionally a local dependency.
86-94
: LGTM! Comprehensive test configuration.Good inclusion of resources and proper dependency composition.
52-59
:❓ Verification inconclusive
Verify runtime dependencies.
Check if all required Iceberg dependencies are included for the submitter.
🏁 Script executed:
#!/bin/bash # Search for Iceberg-related dependencies in the codebase rg "iceberg" -g "BUILD*"Length of output: 440
Verify Iceberg dependencies for submitter
- Confirm that "iceberg-spark-runtime-3.5" is sufficient.
- If the submitter needs BigQuery support, add ":iceberg_bigquery_catalog_lib" (declared in the BUILD file).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (3)
48-50
: Add more assertions for runtime class verification.Add assertions for class availability and configuration.
it should "google runtime classes are available" in { assertTrue(GoogleHadoopFileSystemConfiguration.BLOCK_SIZE.isInstanceOf[HadoopConfigurationProperty[_]]) + assertNotNull(GoogleHadoopFileSystemConfiguration.BLOCK_SIZE.getDefault) + assertTrue(GoogleHadoopFileSystemConfiguration.values().nonEmpty) assertCompiles("classOf[GoogleHadoopFileSystem]") assertCompiles("classOf[GoogleHadoopFS]") }
110-136
: Add edge case tests for serialization.Test serialization with non-empty configuration.
val original = new ResolvingFileIO(); - original.initialize(Map.empty[String, String].asJava) + val config = Map("fs.gs.project.id" -> "test-project") + original.initialize(config.asJava)
74-83
: Address ignored integration tests.Multiple integration tests are ignored. Consider using test containers or mocks.
Would you like help setting up BigQuery test containers or mocks for these tests?
Also applies to: 85-97, 99-105
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (5)
cloud_gcp/BUILD.bazel
(2 hunks)cloud_gcp/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
(3 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala
(1 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- cloud_gcp/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
- cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
- cloud_gcp/BUILD.bazel
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: non_spark_tests
- GitHub Check: bazel_config_tests
- GitHub Check: fetcher_tests
🔇 Additional comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala (1)
66-73
: LGTM! Good package encapsulation.Visibility change enables reuse within
cloud_gcp
package while maintaining encapsulation.cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)
40-42
: Externalize configuration values.Hardcoded GCP configuration values should be moved to environment variables.
- "spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/", - "spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1", - "spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022", + "spark.sql.catalog.spark_catalog.warehouse" -> sys.env.getOrElse("WAREHOUSE_PATH", ""), + "spark.sql.catalog.spark_catalog.gcp_location" -> sys.env.getOrElse("GCP_LOCATION", ""), + "spark.sql.catalog.spark_catalog.gcp_project" -> sys.env.getOrElse("GCP_PROJECT", ""),
val tblFormat = GcpFormatProvider(spark).format("data.purchases").get | ||
val partitions = tblFormat.partitions("data.purchases")(spark) | ||
assertEquals(partitions.flatMap(_.keys), Seq("ds")) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add table existence check.
Verify table exists before testing partitions.
+ assertNotNull("Table should exist", spark.catalog.getTable("data.purchases"))
val tblFormat = GcpFormatProvider(spark).format("data.purchases").get
val partitions = tblFormat.partitions("data.purchases")(spark)
assertEquals(partitions.flatMap(_.keys), Seq("ds"))
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
val tblFormat = GcpFormatProvider(spark).format("data.purchases").get | |
val partitions = tblFormat.partitions("data.purchases")(spark) | |
assertEquals(partitions.flatMap(_.keys), Seq("ds")) | |
} | |
assertNotNull("Table should exist", spark.catalog.getTable("data.purchases")) | |
val tblFormat = GcpFormatProvider(spark).format("data.purchases").get | |
val partitions = tblFormat.partitions("data.purchases")(spark) | |
assertEquals(partitions.flatMap(_.keys), Seq("ds")) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)
110-136
: Add cleanup for Kryo serialization test.assertTrue("Deserialized object should be an instance of ResolvingFileIO", deserializedObj.isInstanceOf[ResolvingFileIO]); + // Cleanup resources + outputStream.close() + inputStream.close() }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(2 hunks)tools/build_rules/dependencies/scala_2_12_repository.bzl
(1 hunks)tools/build_rules/dependencies/scala_2_13_repository.bzl
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- tools/build_rules/dependencies/scala_2_12_repository.bzl
- tools/build_rules/dependencies/scala_2_13_repository.bzl
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: groupby_tests
- GitHub Check: non_spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: fetcher_tests
- GitHub Check: bazel_config_tests
🔇 Additional comments (2)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)
40-42
: Move hardcoded GCP configuration to environment variables.- "spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/", - "spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1", - "spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022", + "spark.sql.catalog.spark_catalog.warehouse" -> sys.env.getOrElse("WAREHOUSE_PATH", ""), + "spark.sql.catalog.spark_catalog.gcp_location" -> sys.env.getOrElse("GCP_LOCATION", ""), + "spark.sql.catalog.spark_catalog.gcp_project" -> sys.env.getOrElse("GCP_PROJECT", ""),
105-107
: Add table existence check before testing partitions.+ assertNotNull("Table should exist", spark.catalog.getTable("data.purchases")) val tblFormat = GcpFormatProvider(spark).format("data.purchases").get val partitions = tblFormat.partitions("data.purchases")(spark)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)
85-124
: Improve error handling in loadTable.Missing specific error handling for different failure scenarios.
override def loadTable(ident: Identifier): Table = { - Try { icebergCatalog.loadTable(ident) } + Try(icebergCatalog.loadTable(ident)) .recover { - case _ => { + case e: Exception => { + log.debug(s"Failed to load table from Iceberg catalog: ${e.getMessage}")
58-63
: Consider eager initialization of critical services.Lazy initialization could delay failure detection.
- @transient private lazy val bqOptions = BigQueryOptions.getDefaultInstance - @transient private lazy val bigQueryClient: BigQuery = bqOptions.getService + @transient private val bqOptions = BigQueryOptions.getDefaultInstance + @transient private val bigQueryClient: BigQuery = bqOptions.getService
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (4)
cloud_gcp/BUILD.bazel
(3 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala
(1 hunks)tools/build_rules/dependencies/scala_2_12_repository.bzl
(1 hunks)tools/build_rules/dependencies/scala_2_13_repository.bzl
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- tools/build_rules/dependencies/scala_2_13_repository.bzl
- tools/build_rules/dependencies/scala_2_12_repository.bzl
- cloud_gcp/BUILD.bazel
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: non_spark_tests
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: bazel_config_tests
🔇 Additional comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (3)
41-45
: Add type checking before casting.Unsafe casts could fail at runtime.
- override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = - internalTable.asInstanceOf[SupportsRead].newScanBuilder(options) + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + require(internalTable.isInstanceOf[SupportsRead], "Table does not support reading") + internalTable.asInstanceOf[SupportsRead].newScanBuilder(options) + } - override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = - internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info) + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { + require(internalTable.isInstanceOf[SupportsWrite], "Table does not support writing") + internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info) + }
51-54
: LGTM!Clean factory method implementation.
96-104
: Enhance URI pattern validation.Silent failures possible if URI pattern doesn't match expectations.
- uris.head.replaceAll("/\\*\\.parquet$", "") + val uri = uris.head + val pattern = "/\\*\\.parquet$" + val newUri = uri.replaceAll(pattern, "") + require(newUri != uri, s"URI $uri does not match expected pattern */*.parquet") + newUri
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)
41-45
:⚠️ Potential issueAdd type checking before casting.
Unsafe casts could fail at runtime.
- override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = - internalTable.asInstanceOf[SupportsRead].newScanBuilder(options) + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + require(internalTable.isInstanceOf[SupportsRead], "Table does not support reading") + internalTable.asInstanceOf[SupportsRead].newScanBuilder(options) + } - override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = - internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info) + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { + require(internalTable.isInstanceOf[SupportsWrite], "Table does not support writing") + internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info) + }
41-45
:⚠️ Potential issueAdd type checking before casting.
Unsafe casts could fail at runtime.
- override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = - internalTable.asInstanceOf[SupportsRead].newScanBuilder(options) + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + require(internalTable.isInstanceOf[SupportsRead], "Table does not support reading") + internalTable.asInstanceOf[SupportsRead].newScanBuilder(options) + } - override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = - internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info) + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { + require(internalTable.isInstanceOf[SupportsWrite], "Table does not support writing") + internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info) + }
🧹 Nitpick comments (8)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (4)
101-104
: Add assertions for partition values.Verify partition values before comparison.
+ assertNotNull("Partitions should not be null", externalPartitions) assertEquals(Seq("2023-11-30"), externalPartitions) + assertNotNull("Partitions should not be null", nativePartitions) assertEquals(Set(20231118, 20231122, 20231125, 20231102, 20231123, 20231119, 20231130, 20231101, 20231117, 20231110, 20231108, 20231112, 20231115, 20231116, 20231113, 20231104, 20231103, 20231106, 20231121, 20231124, 20231128, 20231109, 20231127, 20231129, 20231126, 20231114, 20231107, 20231111, 20231120, 20231105).map(_.toString), nativePartitions.toSet)
99-104
: Improve test data management.Replace hardcoded partition values with test constants or data providers.
- assertEquals(Set(20231118, 20231122, 20231125, 20231102, ...), nativePartitions.toSet) + val EXPECTED_PARTITIONS = Set(20231118, 20231122, /* ... */) + assertEquals(EXPECTED_PARTITIONS, nativePartitions.toSet)
48-52
: Improve type checking in the test.Replace wildcard type with explicit type parameter.
- assertTrue(GoogleHadoopFileSystemConfiguration.BLOCK_SIZE.isInstanceOf[HadoopConfigurationProperty[_]]) + assertTrue(GoogleHadoopFileSystemConfiguration.BLOCK_SIZE.isInstanceOf[HadoopConfigurationProperty[Long]])
101-104
: Magic numbers in test assertions.Extract date values into named constants.
+ private val EXPECTED_EXTERNAL_PARTITION = "2023-11-30" + private val EXPECTED_NATIVE_PARTITIONS = Set(20231118, 20231122, 20231125, 20231102, /* ... */) - assertEquals(Seq("2023-11-30"), externalPartitions) + assertEquals(Seq(EXPECTED_EXTERNAL_PARTITION), externalPartitions) - assertEquals(Set(20231118, 20231122, /* ... */), nativePartitions.toSet) + assertEquals(EXPECTED_NATIVE_PARTITIONS, nativePartitions.toSet)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (4)
98-105
: Enhance URI pattern validation.Add explicit validation for URI patterns.
val uris = externalTable.getSourceUris.asScala val uri = scala .Option(externalTable.getHivePartitioningOptions) .map(_.getSourceUriPrefix) .getOrElse { require(uris.size == 1, s"External table ${table} can be backed by only one URI.") - uris.head.replaceAll("/\\*\\.parquet$", "") + val uri = uris.head + require(uri.endsWith("/*.parquet"), s"URI must end with /*.parquet: $uri") + uri.replaceAll("/\\*\\.parquet$", "") }
117-119
: Document the BigQueryCatalog bug workaround.Add more context about the bug and workaround.
- // Hack because there's a bug in the BigQueryCatalog where they ignore the projectId. + // TODO: Remove workaround once https://issues.apache.org/jira/browse/SPARK-XXXXX is fixed + // BigQueryCatalog ignores projectId when loading tables, so we pass only dataset and table val connectorTable = connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))
117-119
: Document the BigQueryCatalog bug workaround.Add a link to the bug tracker issue for future reference.
- // Hack because there's a bug in the BigQueryCatalog where they ignore the projectId. + // TODO: Remove workaround once fixed: https://issues.apache.org/jira/browse/SPARK-XXXXX + // BigQueryCatalog currently ignores the projectId, so we need to work around it
85-126
: Improve error handling in loadTable.Add specific error types and logging.
- Try { icebergCatalog.loadTable(ident) } + Try { + icebergCatalog.loadTable(ident) + }.recover { + case e: NoSuchTableException => + logger.debug(s"Table not found in Iceberg catalog: ${ident}", e) + loadBigQueryTable(ident) + case e: Exception => + logger.warn(s"Failed to load table from Iceberg catalog: ${ident}", e) + loadBigQueryTable(ident) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala
(1 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(2 hunks)
👮 Files not reviewed due to content moderation or server errors (2)
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
- cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: non_spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: bazel_config_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: non_spark_tests
🔇 Additional comments (8)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (6)
35-42
: Hardcoded configuration values should be externalized.Move sensitive configuration values to environment variables.
- "spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/", - "spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1", - "spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022", + "spark.sql.catalog.spark_catalog.warehouse" -> sys.env.getOrElse("WAREHOUSE_PATH", ""), + "spark.sql.catalog.spark_catalog.gcp_location" -> sys.env.getOrElse("GCP_LOCATION", ""), + "spark.sql.catalog.spark_catalog.gcp_project" -> sys.env.getOrElse("GCP_PROJECT", ""),
111-137
: LGTM! Well-structured Kryo serialization test.The test thoroughly validates serialization and deserialization of ResolvingFileIO.
40-42
: Hardcoded configuration values should be externalized.Move warehouse, location, and project values to configuration files.
111-137
: Well-structured serialization test!Good coverage of serialization/deserialization with proper resource cleanup.
35-42
: Hardcoded configuration values should be externalized.
111-137
: LGTM! Good test coverage for serialization.The test thoroughly verifies Kryo serialization/deserialization of ResolvingFileIO.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)
41-45
: Add type checking before casting.
98-105
: Enhance URI Pattern Validation.
val uris = externalTable.getSourceUris.asScala | ||
val uri = scala | ||
.Option(externalTable.getHivePartitioningOptions) | ||
.map(_.getSourceUriPrefix) | ||
.getOrElse { | ||
require(uris.size == 1, s"External table ${table} can be backed by only one URI.") | ||
uris.head.replaceAll("/\\*\\.parquet$", "") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Enhance URI pattern validation.
URI handling could fail silently if pattern doesn't match.
🏁 Script executed:
#!/bin/bash
# Check for non-standard URI patterns in external tables
rg -A 2 'setSourceUris|setSourceUriPrefix' .
Length of output: 963
Update URI Validation Logic
- In
DelegatingBigQueryMetastoreCatalog.scala
(lines 98–105), the call toreplaceAll("/\\*\\.parquet$", "")
may leave the URI unchanged if it doesn't match exactly, causing silent failures. - Please add explicit validation (or a warning/error) to detect when the regex doesn't affect the URI, ensuring that non-standard patterns don't slip through.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple of comments. this one should be safe to merge first right since I think #424 actually plugs it in?
wait i forgot that PR merges into this one lol
} | ||
} | ||
|
||
class GCSFileIOSerializer extends Serializer[GCSFileIO] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we leave a comment that we're doing this custom serializer because kyro doesn't serialize the gcsfileio
also was wondering if kyro doesn't do it well, could we potentially have defaulted back to java serializer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could actually, I had considered that. We would need a class-specific override. That might be better though, I'll look into changing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed it to use java serialization. let me test this on etsy. I had this originally, can't remember why I switched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems to work, I'll keep to java serialization!
} | ||
|
||
override def read(kryo: Kryo, input: Input, `type`: Class[GCSFileIO]): GCSFileIO = { | ||
val props = kryo.readObject(input, classOf[SerializableMap[String, String]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😆
@transient private lazy val icebergCatalog: SparkCatalog = new SparkCatalog() | ||
@transient private lazy val connectorCatalog: BigQueryCatalog = new BigQueryCatalog() | ||
|
||
// Some stupid spark settings. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😆 are these required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
catalogName
corresponds to spark_catalog
in the config prop, and it is required for spark to accurately route to this catalog extension. Let me add a comment
We don't really use defaultSessionCatalog
at all in the GCP situation, it basically is spark's default HiveCatalogImpl.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, I think there might be more correct way to map the GCP primitives to these values. I will do that in a followup and add unit tests.
override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean = | ||
icebergCatalog.dropNamespace(namespace, cascade) | ||
|
||
override def listTables(namespace: Array[String]): Array[Identifier] = icebergCatalog.listTables(namespace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for this method and the ones above it, do we eventually need to implement the bq catalog versions of them as fallbacks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as far as I can tell, the iceberg catalog will do regular bq catalog stuff as intended with the only difference being when it loads the table. It'll fail to loadTable
if it's not an iceberg table, so we try other clients first before finally failing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To confirm - we don't need any additional streaming tests as we tested this once together last week right? (Don't think I see any major updates deps wise since then?)
import org.apache.iceberg.gcp.gcs.GCSFileIO | ||
import org.apache.iceberg.util.SerializableMap | ||
|
||
class ChrononIcebergKryoRegistrator extends ChrononKryoRegistrator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still needed? based on the description in the pr, you mentioned that its not right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still needed. We need to register iceberg classes in kryo so that we can write iceberg. There is an edge case with GCSFileIO
that it can't be kryoserialized (or I haven't gotten it to work, despite some internet research). So for just that class we will fall back to regular java serialization.
I tested this on Etsy, will plan to roll this out and then test again today. I will rollback if needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ChrononIcebergKryoRegistrator.scala (1)
18-42
: Confirm class registrations are needed.Remove any classes not actually used at runtime to reduce overhead.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)
55-56
: Trivial type check test.Consider verifying more than just instance type for stronger coverage.
101-103
: Avoid hardcoded date literal.Parameterize or generate this date to reduce future maintenance.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1)
90-91
: Avoid null fields.Use Option or default values to prevent potential null pointer issues.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ChrononIcebergKryoRegistrator.scala
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala
(1 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(3 hunks)
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ChrononIcebergKryoRegistrator.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#51
File: spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala:192-200
Timestamp: 2024-11-26T19:47:53.900Z
Learning: Only suggest registering Delta Lake action classes for serialization if they are actually used in the codebase.
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: non_spark_tests
- GitHub Check: bazel_config_tests
- GitHub Check: non_spark_tests
- GitHub Check: fetcher_tests
🔇 Additional comments (6)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ChrononIcebergKryoRegistrator.scala (1)
16-16
: Use JavaSerializer with caution.JavaSerializer is more universal but may be slower than Kryo. Confirm performance is acceptable.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)
142-157
: Add resource cleanup in Kryo tests.Close the outputStream in a finally block to prevent leaks.
170-185
: Same resource cleanup needed.Wrap output/input streams in try-finally or use a safer approach.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (3)
48-52
: Check type before casting.Add require(internalTable.isInstanceOf[SupportsRead]) and require(internalTable.isInstanceOf[SupportsWrite]) to avoid runtime errors.
129-129
: Handle null table.
bigQueryClient.getTable(tId)
can be null if table doesn't exist.
137-137
: Validate URI modifications.If the regex doesn't match, URI remains unchanged. Log or fail on unexpected patterns.
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
0252794
to
3281d24
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (4)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (4)
159-185
:⚠️ Potential issueSame resource leak in GCSFileIO test.
Apply same fix pattern as previous test.
132-157
:⚠️ Potential issueFix resource leaks in serialization test.
Missing proper stream closure in try/finally blocks.
- val outputStream = new ByteArrayOutputStream(); - val output = new Output(outputStream); - kryo.writeClassAndObject(output, original); - output.close(); + val outputStream = new ByteArrayOutputStream() + val output = new Output(outputStream) + try { + kryo.writeClassAndObject(output, original) + } finally { + output.close() + outputStream.close() + } - val inputStream = new ByteArrayInputStream(outputStream.toByteArray()); - val input = new Input(inputStream); - val deserializedObj = kryo.readClassAndObject(input); - input.close(); + val inputStream = new ByteArrayInputStream(outputStream.toByteArray()) + val input = new Input(inputStream) + var deserializedObj: Any = null + try { + deserializedObj = kryo.readClassAndObject(input) + } finally { + input.close() + inputStream.close() + }
132-157
: 🛠️ Refactor suggestionResource leak in ResolvingFileIO serialization test.
Input/output streams not properly closed in finally blocks.
- val outputStream = new ByteArrayOutputStream(); - val output = new Output(outputStream); - kryo.writeClassAndObject(output, original); - output.close(); + var outputStream: ByteArrayOutputStream = null + var output: Output = null + try { + outputStream = new ByteArrayOutputStream() + output = new Output(outputStream) + kryo.writeClassAndObject(output, original) + } finally { + if (output != null) output.close() + } - val inputStream = new ByteArrayInputStream(outputStream.toByteArray()); - val input = new Input(inputStream); - val deserializedObj = kryo.readClassAndObject(input); - input.close(); + var inputStream: ByteArrayInputStream = null + var input: Input = null + var deserializedObj: Any = null + try { + inputStream = new ByteArrayInputStream(outputStream.toByteArray()) + input = new Input(inputStream) + deserializedObj = kryo.readClassAndObject(input) + } finally { + if (input != null) input.close() + }
159-185
: 🛠️ Refactor suggestionSimilar resource leak in GCSFileIO serialization test.
Same issue as previous test.
- val outputStream = new ByteArrayOutputStream(); - val output = new Output(outputStream); - kryo.writeClassAndObject(output, original); - output.close(); + var outputStream: ByteArrayOutputStream = null + var output: Output = null + try { + outputStream = new ByteArrayOutputStream() + output = new Output(outputStream) + kryo.writeClassAndObject(output, original) + } finally { + if (output != null) output.close() + } - val inputStream = new ByteArrayInputStream(outputStream.toByteArray()); - val input = new Input(inputStream); - val deserializedObj = kryo.readClassAndObject(input); - input.close(); + var inputStream: ByteArrayInputStream = null + var input: Input = null + var deserializedObj: Any = null + try { + inputStream = new ByteArrayInputStream(outputStream.toByteArray()) + input = new Input(inputStream) + deserializedObj = kryo.readClassAndObject(input) + } finally { + if (input != null) input.close() + }
🧹 Nitpick comments (2)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)
112-118
: Add assertions for insertPartitions result.Verify table contents after insert.
tableUtils.insertPartitions( df, "data.tchow_test_iceberg", Map( "file_format" -> "PARQUET", "table_type" -> "iceberg"), List("ds"), - saveMode = SaveMode.Overwrite) + saveMode = SaveMode.Overwrite) + + // Verify insertion succeeded + val insertedData = tableUtils.loadTable("data.tchow_test_iceberg") + assertNotNull(insertedData) + assertTrue(insertedData.count() > 0)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1)
150-150
: Offer help with partitioningThe TODO suggests partition support is missing.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
⛔ Files ignored due to path filters (1)
cloud_gcp/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar
is excluded by!**/*.jar
📒 Files selected for processing (16)
.github/workflows/require_triggered_status_checks.yaml
(1 hunks)cloud_gcp/BUILD.bazel
(3 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ChrononIcebergKryoRegistrator.scala
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala
(1 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(3 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(0 hunks)flink/BUILD.bazel
(2 hunks)online/BUILD.bazel
(2 hunks)spark/BUILD.bazel
(2 hunks)spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala
(3 hunks)spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala
(1 hunks)spark_install.json
(10 hunks)tools/build_rules/dependencies/maven_repository.bzl
(5 hunks)tools/build_rules/dependencies/scala_2_12_repository.bzl
(1 hunks)tools/build_rules/dependencies/scala_2_13_repository.bzl
(1 hunks)tools/build_rules/dependencies/spark_repository.bzl
(1 hunks)
💤 Files with no reviewable changes (1)
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
🚧 Files skipped from review as they are similar to previous changes (10)
- flink/BUILD.bazel
- spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala
- .github/workflows/require_triggered_status_checks.yaml
- tools/build_rules/dependencies/spark_repository.bzl
- tools/build_rules/dependencies/scala_2_13_repository.bzl
- tools/build_rules/dependencies/scala_2_12_repository.bzl
- spark/BUILD.bazel
- online/BUILD.bazel
- spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala
- tools/build_rules/dependencies/maven_repository.bzl
🧰 Additional context used
🧠 Learnings (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ChrononIcebergKryoRegistrator.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#51
File: spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala:192-200
Timestamp: 2024-11-26T19:47:53.900Z
Learning: Only suggest registering Delta Lake action classes for serialization if they are actually used in the codebase.
cloud_gcp/BUILD.bazel (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: streaming_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: non_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: analyzer_tests
- GitHub Check: non_spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (29)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ChrononIcebergKryoRegistrator.scala (7)
11-16
: Serialization fallback explanation is clear.The comment effectively explains the serialization issues with GCSFileIO closures.
18-41
: Complete class list for Iceberg serialization.List covers essential Iceberg classes for proper serialization.
7-44
:❓ Verification inconclusive
Consider integration tests with actual Iceberg operations.
Add tests verifying this registrator works with real Iceberg operations.
🏁 Script executed:
#!/bin/bash # Check for integration tests that use ChrononIcebergKryoRegistrator rg -l "ChrononIcebergKryoRegistrator" --type scalaLength of output: 230
Action: Enhance Integration Testing
It appears that
BigQueryCatalogTest.scala
references the registrator, but it’s not clear if it covers real Iceberg operations. Please add or verify an integration test that explicitly triggers Iceberg operations on GCS (e.g., writing/reading data) to ensure that the custom registrations, in particular forGCSFileIO
, work as expected.
1-6
: Imports look good.Proper import structure for Kryo and Iceberg components.
7-17
: Good fallback to JavaSerializer for GCSFileIO.Using JavaSerializer for GCSFileIO addresses closure serialization issues. Clear documentation of the problem.
18-42
: Comprehensive class registration for Iceberg components.The class list covers essential Iceberg components needed for serialization.
43-44
: Efficient registration implementation.Well-structured method with proper use of superclass initialization.
cloud_gcp/BUILD.bazel (2)
1-40
: Good dependency consolidation.Shared deps pattern improves maintainability.
72-75
: Local jar dependency is used appropriately.This approach is correct for the local iceberg-bigquery-catalog jar.
spark_install.json (10)
3-4
: Artifact Hash Update. Updated artifact hashes; ensure alignment with new dependency resolutions.
703-708
: Servlet API Update. Shasums and version updated for servlet-api; appears correct.
2294-2326
: Dependency Update. New dependencies (e.g., guice and jersey modules) added under hadoop-yarn-common; verify necessity.
2331-2339
: Jackson Modules. Minor reordering/update of Jackson modules; benign change.
2441-2450
: Hive Shim Update. Updated hive-shims-common and scheduler; dependency chain appears intact.
2523-2529
: Hive Shims Expansion. Inclusion of hive-shims-scheduler in hive-shims is noted; check compatibility.
2965-2971
: Twill Dependency. Added twill-discovery-api dependency; concise and clear update.
8046-8052
: Jasper Runtime. Jasper-runtime updated; confirm artifact hash consistency.
8595-8601
: Jasper Runtime Duplicate. Repeated update to jasper-runtime; ensure consistency across entries.
9653-9659
: Jasper Sources. Updated jasper-runtime:jar:sources; verify this change is intentional.cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (6)
49-49
: Add safe-cast requirementCasting to
SupportsRead
can fail at runtime.
52-52
: Add safe-cast requirementCasting to
SupportsWrite
can fail at runtime.
89-92
: Avoid null fieldsUse
Option
or defaults to prevent NPE risk.
129-129
: Check for null
getTable
can return null, leading to an NPE.
131-138
: Validate URI pattern
replaceAll
may silently do nothing if no match.
164-164
: Verify catalog fallbackIf
defaultSessionCatalog
is null or not a TableCatalog, fallback may fail.cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (4)
3-26
: Imports organized properly.Appropriate imports for testing Kryo serialization with Iceberg components.
54-55
: Type check improved.Using wildcard type parameter is more flexible.
36-49
:✅ Verification successful
Configuration commented out for local testing.
Local integration test configuration is commented out. Uncomment when needed.
🏁 Script executed:
#!/bin/bash # Check if the configuration values exist in environment variables echo "Checking environment variables for local testing..." for var in WAREHOUSE_PATH GCP_LOCATION GCP_PROJECT; do if [ -z "${!var}" ]; then echo "Warning: $var is not set" fi doneLength of output: 546
Action: Leave local integration configuration commented out until needed.
The configuration in
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(lines 36–49) is intentionally commented out for local testing. Verification confirms that the required environment variables (WAREHOUSE_PATH, GCP_LOCATION, GCP_PROJECT) are not set.
- When running local integration tests, set these environment variables and uncomment the configuration.
- Otherwise, no action is required.
99-130
:❓ Verification inconclusive
Integration test needs verification.
Assertions on partitions look good, but test is ignored. Consider enabling when needed.
🏁 Script executed:
#!/bin/bash # Check partitioning columns in BigQuery echo "Checking for partitioning column information..." fd . -t f -e scala | xargs grep -l "partitioning" | xargs grep -A 5 "partitioning"Length of output: 8263
Integration Test Pending Stub Implementation
- Assertions on partitioning are correct.
- Test remains ignored due to live BigQuery dependency.
- Re-enable once stubs/mock support is added.
// "spark.hadoop.google.cloud.auth.service.account.enable" -> false.toString, | ||
// "spark.sql.catalog.spark_catalog.catalog-impl" -> classOf[BQMSCatalog].getName, | ||
// "spark.sql.catalog.spark_catalog" -> classOf[DelegatingBigQueryMetastoreCatalog].getName, | ||
// "spark.sql.catalog.spark_catalog.io-impl" -> classOf[ResolvingFileIO].getName, | ||
// "spark.sql.defaultUrlStreamHandlerFactory.enabled" -> false.toString, | ||
// "spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/", | ||
// "spark.sql.catalog.spark_catalog.gcp_location" -> "us-central1", | ||
// "spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022", | ||
// "spark.sql.catalogImplementation" -> "in-memory", | ||
// "spark.kryo.registrator" -> classOf[ChrononIcebergKryoRegistrator].getName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Commented config needs environment variables.
Uncomment for local testing and set required environment variables.
🏁 Script executed:
#!/bin/bash
# Check for testing environment variables
echo "Required environment variables for local testing:"
echo "WAREHOUSE_PATH, GCP_LOCATION, GCP_PROJECT"
env | grep -E "WAREHOUSE_PATH|GCP_LOCATION|GCP_PROJECT" || echo "No environment variables set"
Length of output: 326
Action: Configure Local Testing Environment
- The config lines remain commented out.
- Local tests require the environment variables:
WAREHOUSE_PATH
,GCP_LOCATION
, andGCP_PROJECT
. - Please uncomment the config in
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
(lines 40–49) and set these variables.
## Summary - Modifying TableUtils to use spark's built-in catalog wherever applicable. This will both improve performance and allow us to access iceberg tables through the bigquery catalog, since its built on #393. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced external checkout queries now deliver additional purchase details for richer data insights. - **Refactor** - Updated cloud integrations to enforce supported table formats and provide clearer error messaging. - Streamlined table creation and partitioning processes in Spark for improved performance and maintainability. - Standardized collection types for partitioning and sorting parameters in Dataframe operations. - **Tests** - Refined test cases to validate dynamic provider handling and verify consistent table operations. - Removed outdated test case for field name retrieval, reflecting changes in validation focus. - Updated assertions to utilize the new format provider access method. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary Large PR here. This is the first part to supporting iceberg using bigquery as a metastore. There are a few components to this: 1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of this. This abstraction wraps the iceberg bigquery catalog that we've introduced through a local jar download. The reason for wrapping it instead of simply using it is so that we can allow it to handle non-iceberg tables in both sql and non-sql spark contexts. This is useful for reading Etsy's beacon datasets which are simply parquet external tables, as well as their CDC streams that are bigquery native tables. 2. `GCSFileOSerializer` is a simple wrapper that uses regular java serialization instead of kryo to handle the GCSFileIO, since Kryo doesn't handle closure serialization very well. I had added a few classes into the kryo registrator for serializing closures but it still doesn't seem to work in an actual spark job. I ultimately had to fall back to regular java serialization, but since this is for just one class that's not on the hotpath it should be fine. 3. Some serialization unit tests. 4. Lots of jar wrangling to get things to work in the right way. We'll have to make sure this doesn't break the streaming / fetching side of things as well. Things to note are: 1. Had to submit a patch to the spark bigquery connector code GoogleCloudDataproc/spark-bigquery-connector#1340 because the connector does not support three-part namespacing. As such, you can only query tables that belong to the same project within a single sql query until the above patch is in. 2. You can only write icberg tables to the currently configured project. The project is configured using `additional-confs.yaml` and I used the following config set to test this behavior: ```yaml spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider" spark.chronon.partition.format: "yyyy-MM-dd" spark.chronon.table.gcs.temporary_gcs_bucket: "zipline-warehouse-canary" spark.chronon.partition.column: "ds" spark.chronon.table.gcs.connector_output_dataset: "data" spark.chronon.table.gcs.connector_output_project: "canary-443022" spark.chronon.coalesce.factor: "10" spark.default.parallelism: "10" spark.sql.shuffle.partitions: "10" spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/" spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/" spark.sql.catalog.spark_catalog.gcp_location: "us-central1" spark.sql.catalog.spark_catalog.gcp_project: "canary-443022" spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO spark.sql.defaultUrlStreamHandlerFactory.enabled: "false" spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator" ``` 3. I had to remove https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121 from the cluster because it conflicts with the metastore and connector jar being brought in here as dependencies. We'll need to rebuild our clusters (and the ones on Etsy) _without_ the jar cc @chewy-zlai 4. Also made a change to the canary branch zipline-ai/canary-confs@65fac34 to remove the project ID. This is not supported using the catalogs we have. The configured project `spark.sql.catalog.spark_catalog.gcp_project` is taken into account across the board. ## Checklist - [x] Added Unit Tests - [ ] Covered by existing CI - [x] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced enhanced catalog management for seamless integration between BigQuery and Iceberg. - Added custom serialization logic to improve stability and performance. - **Refactor / Dependency Updates** - Streamlined dependency management and updated various library versions for smoother runtime behavior. - Refined Spark session configuration to ensure consistent application of settings. - Added new dependencies related to Hadoop client API. - **Tests** - Expanded integration tests for BigQuery functionality and Kryo serialization. - Removed obsolete test cases to focus on relevant validation. - **CI/CD** - Updated workflow triggers to activate on push events for improved integration responsiveness. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - Modifying TableUtils to use spark's built-in catalog wherever applicable. This will both improve performance and allow us to access iceberg tables through the bigquery catalog, since its built on #393. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced external checkout queries now deliver additional purchase details for richer data insights. - **Refactor** - Updated cloud integrations to enforce supported table formats and provide clearer error messaging. - Streamlined table creation and partitioning processes in Spark for improved performance and maintainability. - Standardized collection types for partitioning and sorting parameters in Dataframe operations. - **Tests** - Refined test cases to validate dynamic provider handling and verify consistent table operations. - Removed outdated test case for field name retrieval, reflecting changes in validation focus. - Updated assertions to utilize the new format provider access method. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary Large PR here. This is the first part to supporting iceberg using bigquery as a metastore. There are a few components to this: 1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of this. This abstraction wraps the iceberg bigquery catalog that we've introduced through a local jar download. The reason for wrapping it instead of simply using it is so that we can allow it to handle non-iceberg tables in both sql and non-sql spark contexts. This is useful for reading Etsy's beacon datasets which are simply parquet external tables, as well as their CDC streams that are bigquery native tables. 2. `GCSFileOSerializer` is a simple wrapper that uses regular java serialization instead of kryo to handle the GCSFileIO, since Kryo doesn't handle closure serialization very well. I had added a few classes into the kryo registrator for serializing closures but it still doesn't seem to work in an actual spark job. I ultimately had to fall back to regular java serialization, but since this is for just one class that's not on the hotpath it should be fine. 3. Some serialization unit tests. 4. Lots of jar wrangling to get things to work in the right way. We'll have to make sure this doesn't break the streaming / fetching side of things as well. Things to note are: 1. Had to submit a patch to the spark bigquery connector code GoogleCloudDataproc/spark-bigquery-connector#1340 because the connector does not support three-part namespacing. As such, you can only query tables that belong to the same project within a single sql query until the above patch is in. 2. You can only write icberg tables to the currently configured project. The project is configured using `additional-confs.yaml` and I used the following config set to test this behavior: ```yaml spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider" spark.chronon.partition.format: "yyyy-MM-dd" spark.chronon.table.gcs.temporary_gcs_bucket: "zipline-warehouse-canary" spark.chronon.partition.column: "ds" spark.chronon.table.gcs.connector_output_dataset: "data" spark.chronon.table.gcs.connector_output_project: "canary-443022" spark.chronon.coalesce.factor: "10" spark.default.parallelism: "10" spark.sql.shuffle.partitions: "10" spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/" spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/" spark.sql.catalog.spark_catalog.gcp_location: "us-central1" spark.sql.catalog.spark_catalog.gcp_project: "canary-443022" spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO spark.sql.defaultUrlStreamHandlerFactory.enabled: "false" spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator" ``` 3. I had to remove https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121 from the cluster because it conflicts with the metastore and connector jar being brought in here as dependencies. We'll need to rebuild our clusters (and the ones on Etsy) _without_ the jar cc @chewy-zlai 4. Also made a change to the canary branch zipline-ai/canary-confs@65fac34 to remove the project ID. This is not supported using the catalogs we have. The configured project `spark.sql.catalog.spark_catalog.gcp_project` is taken into account across the board. ## Checklist - [x] Added Unit Tests - [ ] Covered by existing CI - [x] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced enhanced catalog management for seamless integration between BigQuery and Iceberg. - Added custom serialization logic to improve stability and performance. - **Refactor / Dependency Updates** - Streamlined dependency management and updated various library versions for smoother runtime behavior. - Refined Spark session configuration to ensure consistent application of settings. - Added new dependencies related to Hadoop client API. - **Tests** - Expanded integration tests for BigQuery functionality and Kryo serialization. - Removed obsolete test cases to focus on relevant validation. - **CI/CD** - Updated workflow triggers to activate on push events for improved integration responsiveness. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - Modifying TableUtils to use spark's built-in catalog wherever applicable. This will both improve performance and allow us to access iceberg tables through the bigquery catalog, since its built on #393. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced external checkout queries now deliver additional purchase details for richer data insights. - **Refactor** - Updated cloud integrations to enforce supported table formats and provide clearer error messaging. - Streamlined table creation and partitioning processes in Spark for improved performance and maintainability. - Standardized collection types for partitioning and sorting parameters in Dataframe operations. - **Tests** - Refined test cases to validate dynamic provider handling and verify consistent table operations. - Removed outdated test case for field name retrieval, reflecting changes in validation focus. - Updated assertions to utilize the new format provider access method. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary Large PR here. This is the first part to supporting iceberg using bigquery as a metastore. There are a few components to this: 1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of this. This abstraction wraps the iceberg bigquery catalog that we've introduced through a local jar download. The reason for wrapping it instead of simply using it is so that we can allow it to handle non-iceberg tables in both sql and non-sql spark contexts. This is useful for reading our clients's beacon datasets which are simply parquet external tables, as well as their CDC streams that are bigquery native tables. 2. `GCSFileOSerializer` is a simple wrapper that uses regular java serialization instead of kryo to handle the GCSFileIO, since Kryo doesn't handle closure serialization very well. I had added a few classes into the kryo registrator for serializing closures but it still doesn't seem to work in an actual spark job. I ultimately had to fall back to regular java serialization, but since this is for just one class that's not on the hotpath it should be fine. 3. Some serialization unit tests. 4. Lots of jar wrangling to get things to work in the right way. We'll have to make sure this doesn't break the streaming / fetching side of things as well. Things to note are: 1. Had to submit a patch to the spark bigquery connector code GoogleCloudDataproc/spark-bigquery-connector#1340 because the connector does not support three-part namespacing. As such, you can only query tables that belong to the same project within a single sql query until the above patch is in. 2. You can only write icberg tables to the currently configured project. The project is configured using `additional-confs.yaml` and I used the following config set to test this behavior: ```yaml spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider" spark.chronon.partition.format: "yyyy-MM-dd" spark.chronon.table.gcs.temporary_gcs_bucket: "zipline-warehouse-canary" spark.chronon.partition.column: "ds" spark.chronon.table.gcs.connector_output_dataset: "data" spark.chronon.table.gcs.connector_output_project: "canary-443022" spark.chronon.coalesce.factor: "10" spark.default.parallelism: "10" spark.sql.shuffle.partitions: "10" spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/" spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/" spark.sql.catalog.spark_catalog.gcp_location: "us-central1" spark.sql.catalog.spark_catalog.gcp_project: "canary-443022" spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO spark.sql.defaultUrlStreamHandlerFactory.enabled: "false" spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator" ``` 3. I had to remove https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121 from the cluster because it conflicts with the metastore and connector jar being brought in here as dependencies. We'll need to rebuild our clusters (and the ones on our clients) _without_ the jar cc @chewy-zlai 4. Also made a change to the canary branch zipline-ai/canary-confs@65fac34 to remove the project ID. This is not supported using the catalogs we have. The configured project `spark.sql.catalog.spark_catalog.gcp_project` is taken into account across the board. ## Checklist - [x] Added Unit Tests - [ ] Covered by existing CI - [x] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced enhanced catalog management for seamless integration between BigQuery and Iceberg. - Added custom serialization logic to improve stability and performance. - **Refactor / Dependency Updates** - Streamlined dependency management and updated various library versions for smoother runtime behavior. - Refined Spark session configuration to ensure consistent application of settings. - Added new dependencies related to Hadoop client API. - **Tests** - Expanded integration tests for BigQuery functionality and Kryo serialization. - Removed obsolete test cases to focus on relevant validation. - **CI/CD** - Updated workflow triggers to activate on push events for improved integration responsiveness. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - Modifying TableUtils to use spark's built-in catalog wherever applicable. This will both improve performance and allow us to access iceberg tables through the bigquery catalog, since its built on #393. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced external checkout queries now deliver additional purchase details for richer data insights. - **Refactor** - Updated cloud integrations to enforce supported table formats and provide clearer error messaging. - Streamlined table creation and partitioning processes in Spark for improved performance and maintainability. - Standardized collection types for partitioning and sorting parameters in Dataframe operations. - **Tests** - Refined test cases to validate dynamic provider handling and verify consistent table operations. - Removed outdated test case for field name retrieval, reflecting changes in validation focus. - Updated assertions to utilize the new format provider access method. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary Large PR here. This is the first part to supporting iceberg using bigquery as a metastore. There are a few components to this: 1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of this. This abstraction wraps the iceberg bigquery catalog that we've introduced through a local jar download. The reason for wrapping it instead of simply using it is so that we can allow it to handle non-iceberg tables in both sql and non-sql spark contexts. This is useful for reading our clients's beacon datasets which are simply parquet external tables, as well as their CDC streams that are bigquery native tables. 2. `GCSFileOSerializer` is a simple wrapper that uses regular java serialization instead of kryo to handle the GCSFileIO, since Kryo doesn't handle closure serialization very well. I had added a few classes into the kryo registrator for serializing closures but it still doesn't seem to work in an actual spark job. I ultimately had to fall back to regular java serialization, but since this is for just one class that's not on the hotpath it should be fine. 3. Some serialization unit tests. 4. Lots of jar wrangling to get things to work in the right way. We'll have to make sure this doesn't break the streaming / fetching side of things as well. Things to note are: 1. Had to submit a patch to the spark bigquery connector code GoogleCloudDataproc/spark-bigquery-connector#1340 because the connector does not support three-part namespacing. As such, you can only query tables that belong to the same project within a single sql query until the above patch is in. 2. You can only write icberg tables to the currently configured project. The project is configured using `additional-confs.yaml` and I used the following config set to test this behavior: ```yaml spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider" spark.chronon.partition.format: "yyyy-MM-dd" spark.chronon.table.gcs.temporary_gcs_bucket: "zipline-warehouse-canary" spark.chronon.partition.column: "ds" spark.chronon.table.gcs.connector_output_dataset: "data" spark.chronon.table.gcs.connector_output_project: "canary-443022" spark.chronon.coalesce.factor: "10" spark.default.parallelism: "10" spark.sql.shuffle.partitions: "10" spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/" spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/" spark.sql.catalog.spark_catalog.gcp_location: "us-central1" spark.sql.catalog.spark_catalog.gcp_project: "canary-443022" spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO spark.sql.defaultUrlStreamHandlerFactory.enabled: "false" spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator" ``` 3. I had to remove https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121 from the cluster because it conflicts with the metastore and connector jar being brought in here as dependencies. We'll need to rebuild our clusters (and the ones on our clients) _without_ the jar cc @chewy-zlai 4. Also made a change to the canary branch zipline-ai/canary-confs@65fac34 to remove the project ID. This is not supported using the catalogs we have. The configured project `spark.sql.catalog.spark_catalog.gcp_project` is taken into account across the board. ## Checklist - [x] Added Unit Tests - [ ] Covered by existing CI - [x] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced enhanced catalog management for seamless integration between BigQuery and Iceberg. - Added custom serialization logic to improve stability and performance. - **Refactor / Dependency Updates** - Streamlined dependency management and updated various library versions for smoother runtime behavior. - Refined Spark session configuration to ensure consistent application of settings. - Added new dependencies related to Hadoop client API. - **Tests** - Expanded integration tests for BigQuery functionality and Kryo serialization. - Removed obsolete test cases to focus on relevant validation. - **CI/CD** - Updated workflow triggers to activate on push events for improved integration responsiveness. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - Modifying TableUtils to use spark's built-in catalog wherever applicable. This will both improve performance and allow us to access iceberg tables through the bigquery catalog, since its built on #393. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced external checkout queries now deliver additional purchase details for richer data insights. - **Refactor** - Updated cloud integrations to enforce supported table formats and provide clearer error messaging. - Streamlined table creation and partitioning processes in Spark for improved performance and maintainability. - Standardized collection types for partitioning and sorting parameters in Dataframe operations. - **Tests** - Refined test cases to validate dynamic provider handling and verify consistent table operations. - Removed outdated test case for field name retrieval, reflecting changes in validation focus. - Updated assertions to utilize the new format provider access method. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary Large PR here. This is the first part to supporting iceberg using bigquery as a metastore. There are a few components to this: 1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of this. This abstraction wraps the iceberg bigquery catalog that we've introduced through a local jar download. The reason for wrapping it instead of simply using it is so that we can allow it to handle non-iceberg tables in both sql and non-sql spark contexts. This is useful for reading our clients's beacon datasets which are simply parquet external tables, as well as their CDC streams that are bigquery native tables. 2. `GCSFileOSerializer` is a simple wrapper that uses regular java serialization instead of kryo to handle the GCSFileIO, since Kryo doesn't handle closure serialization very well. I had added a few classes into the kryo registrator for serializing closures but it still doesn't seem to work in an actual spark job. I ultimately had to fall baour clients to regular java serialization, but since this is for just one class that's not on the hotpath it should be fine. 3. Some serialization unit tests. 4. Lots of jar wrangling to get things to work in the right way. We'll have to make sure this doesn't break the streaming / fetching side of things as well. Things to note are: 1. Had to submit a patch to the spark bigquery connector code GoogleCloudDataproc/spark-bigquery-connector#1340 because the connector does not support three-part namespacing. As such, you can only query tables that belong to the same project within a single sql query until the above patch is in. 2. You can only write icberg tables to the currently configured project. The project is configured using `additional-confs.yaml` and I used the following config set to test this behavior: ```yaml spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider" spark.chronon.partition.format: "yyyy-MM-dd" spark.chronon.table.gcs.temporary_gcs_buour clientset: "zipline-warehouse-canary" spark.chronon.partition.column: "ds" spark.chronon.table.gcs.connector_output_dataset: "data" spark.chronon.table.gcs.connector_output_project: "canary-443022" spark.chronon.coalesce.factor: "10" spark.default.parallelism: "10" spark.sql.shuffle.partitions: "10" spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/" spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/" spark.sql.catalog.spark_catalog.gcp_location: "us-central1" spark.sql.catalog.spark_catalog.gcp_project: "canary-443022" spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO spark.sql.defaultUrlStreamHandlerFactory.enabled: "false" spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator" ``` 3. I had to remove https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121 from the cluster because it conflicts with the metastore and connector jar being brought in here as dependencies. We'll need to rebuild our clusters (and the ones on our clients) _without_ the jar cc @chewy-zlai 4. Also made a change to the canary branch zipline-ai/canary-confs@65fac34 to remove the project ID. This is not supported using the catalogs we have. The configured project `spark.sql.catalog.spark_catalog.gcp_project` is taken into account across the board. ## Cheour clientslist - [x] Added Unit Tests - [ ] Covered by existing CI - [x] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced enhanced catalog management for seamless integration between BigQuery and Iceberg. - Added custom serialization logic to improve stability and performance. - **Refactor / Dependency Updates** - Streamlined dependency management and updated various library versions for smoother runtime behavior. - Refined Spark session configuration to ensure consistent application of settings. - Added new dependencies related to Hadoop client API. - **Tests** - Expanded integration tests for BigQuery functionality and Kryo serialization. - Removed obsolete test cases to focus on relevant validation. - **CI/CD** - Updated workflow triggers to activate on push events for improved integration responsiveness. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to traour clients the status of staour clientss when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - Modifying TableUtils to use spark's built-in catalog wherever applicable. This will both improve performance and allow us to access iceberg tables through the bigquery catalog, since its built on #393. ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced external cheour clientsout queries now deliver additional purchase details for richer data insights. - **Refactor** - Updated cloud integrations to enforce supported table formats and provide clearer error messaging. - Streamlined table creation and partitioning processes in Spark for improved performance and maintainability. - Standardized collection types for partitioning and sorting parameters in Dataframe operations. - **Tests** - Refined test cases to validate dynamic provider handling and verify consistent table operations. - Removed outdated test case for field name retrieval, reflecting changes in validation focus. - Updated assertions to utilize the new format provider access method. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to traour clients the status of staour clientss when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
Summary
Large PR here. This is the first part to supporting iceberg using bigquery as a metastore. There are a few components to this:
DelegatingBigQueryMetastoreCatalog
is the main actor in all of this. This abstraction wraps the iceberg bigquery catalog that we've introduced through a local jar download. The reason for wrapping it instead of simply using it is so that we can allow it to handle non-iceberg tables in both sql and non-sql spark contexts. This is useful for reading Etsy's beacon datasets which are simply parquet external tables, as well as their CDC streams that are bigquery native tables.GCSFileOSerializer
is a simple wrapper that uses regular java serialization instead of kryo to handle the GCSFileIO, since Kryo doesn't handle closure serialization very well. I had added a few classes into the kryo registrator for serializing closures but it still doesn't seem to work in an actual spark job. I ultimately had to fall back to regular java serialization, but since this is for just one class that's not on the hotpath it should be fine.Things to note are:
additional-confs.yaml
and I used the following config set to test this behavior:spark.sql.catalog.spark_catalog.gcp_project
is taken into account across the board.Checklist
Summary by CodeRabbit
New Features
Refactor / Dependency Updates
Tests
CI/CD