Skip to content

feat: bigquery catalog with iceberg support #393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 5, 2025
Merged

Conversation

tchow-zlai
Copy link
Collaborator

@tchow-zlai tchow-zlai commented Feb 17, 2025

Summary

Large PR here. This is the first part to supporting iceberg using bigquery as a metastore. There are a few components to this:

  1. DelegatingBigQueryMetastoreCatalog is the main actor in all of this. This abstraction wraps the iceberg bigquery catalog that we've introduced through a local jar download. The reason for wrapping it instead of simply using it is so that we can allow it to handle non-iceberg tables in both sql and non-sql spark contexts. This is useful for reading Etsy's beacon datasets which are simply parquet external tables, as well as their CDC streams that are bigquery native tables.
  2. GCSFileOSerializer is a simple wrapper that uses regular java serialization instead of kryo to handle the GCSFileIO, since Kryo doesn't handle closure serialization very well. I had added a few classes into the kryo registrator for serializing closures but it still doesn't seem to work in an actual spark job. I ultimately had to fall back to regular java serialization, but since this is for just one class that's not on the hotpath it should be fine.
  3. Some serialization unit tests.
  4. Lots of jar wrangling to get things to work in the right way. We'll have to make sure this doesn't break the streaming / fetching side of things as well.

Things to note are:

  1. Had to submit a patch to the spark bigquery connector code fix: respect project id for incoming identifier GoogleCloudDataproc/spark-bigquery-connector#1340 because the connector does not support three-part namespacing. As such, you can only query tables that belong to the same project within a single sql query until the above patch is in.
  2. You can only write icberg tables to the currently configured project. The project is configured using additional-confs.yaml and I used the following config set to test this behavior:
spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider"
spark.chronon.partition.format: "yyyy-MM-dd"
spark.chronon.table.gcs.temporary_gcs_bucket: "zipline-warehouse-canary"
spark.chronon.partition.column: "ds"
spark.chronon.table.gcs.connector_output_dataset: "data"
spark.chronon.table.gcs.connector_output_project: "canary-443022"
spark.chronon.coalesce.factor: "10"
spark.default.parallelism: "10"
spark.sql.shuffle.partitions: "10"
spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.gcp_location: "us-central1"
spark.sql.catalog.spark_catalog.gcp_project: "canary-443022"
spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO
spark.sql.defaultUrlStreamHandlerFactory.enabled: "false"
spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator"
  1. I had to remove https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121 from the cluster because it conflicts with the metastore and connector jar being brought in here as dependencies. We'll need to rebuild our clusters (and the ones on Etsy) without the jar cc @chewy-zlai
  2. Also made a change to the canary branch https://github.com/zipline-ai/cananry-confs/commit/65fac34f01d63698e29f586606eaa12569f6cfe0 to remove the project ID. This is not supported using the catalogs we have. The configured project spark.sql.catalog.spark_catalog.gcp_project is taken into account across the board.

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Introduced enhanced catalog management for seamless integration between BigQuery and Iceberg.
    • Added custom serialization logic to improve stability and performance.
  • Refactor / Dependency Updates

    • Streamlined dependency management and updated various library versions for smoother runtime behavior.
    • Refined Spark session configuration to ensure consistent application of settings.
    • Added new dependencies related to Hadoop client API.
  • Tests

    • Expanded integration tests for BigQuery functionality and Kryo serialization.
    • Removed obsolete test cases to focus on relevant validation.
  • CI/CD

    • Updated workflow triggers to activate on push events for improved integration responsiveness.

Copy link

coderabbitai bot commented Feb 17, 2025

Walkthrough

The changes update dependency management and build configurations across multiple modules. A new variable shared_deps is introduced to streamline dependencies in cloud_gcp. Several build and Maven files have updated artifact hashes, dependency versions, and exclusions. Tests have been enhanced with new cases for Kryo serialization and partitioning, while deprecated tests are removed. New classes, including a catalog delegation class and a custom Kryo registrator for Iceberg, have been added. Workflow triggers and Spark configuration ordering are also modified.

Changes

File(s) Change Summary
cloud_gcp/BUILD.bazel Introduced shared_deps, updated deps, runtime_deps, and test_deps; added java_import for iceberg_bigquery_catalog_lib.
cloud_gcp/src/test/scala/.../BigQueryCatalogTest.scala
cloud_gcp/src/test/scala/.../DataprocSubmitterTest.scala
Reorganized imports, updated Spark partition config, added Kryo serialization tests, expanded partition assertions, and removed an outdated BigQuery compatibility test.
spark/src/main/scala/.../ChrononKryoRegistrator.scala
cloud_gcp/src/main/scala/.../ChrononIcebergKryoRegistrator.scala
spark/src/main/scala/.../SparkSessionBuilder.scala
Updated Kryo class registrations with new classes; added custom Iceberg serialization logic; reordered merged configuration application.
cloud_gcp/src/main/scala/.../DelegatingBigQueryMetastoreCatalog.scala New catalog class with methods to fallback from Iceberg to BigQuery for table operations.
maven_install.json
spark_install.json
Updated artifact hashes; added new Iceberg Spark Runtime artifact; removed obsolete dependencies.
tools/build_rules/dependencies/maven_repository.bzl
tools/build_rules/dependencies/spark_repository.bzl
tools/build_rules/dependencies/scala_2_12_repository.bzl
tools/build_rules/dependencies/scala_2_13_repository.bzl
Added/updated dependencies (e.g. objenesis, Hadoop, Iceberg, BigQuery); modified exclusions; removed deprecated Spark BigQuery dependencies.
.github/workflows/require_triggered_status_checks.yaml Changed workflow trigger from pull_request to push.
flink/BUILD.bazel
online/BUILD.bazel
Added Maven artifacts for hadoop-client-api and hadoop-common to both library and test dependency lists.

Sequence Diagram(s)

sequenceDiagram
    participant C as Client
    participant D as DelegatingCatalog
    participant I as IcebergCatalog
    participant B as BigQueryCatalog
    C->>D: loadTable(ident)
    D->>I: try loadTable(ident)
    alt Iceberg success
        I-->>D: Table
        D-->>C: Return Table
    else Failure
        D->>B: loadTable(ident)
        B-->>D: Table
        D-->>C: Return Table
    end
Loading

Possibly related PRs

Suggested reviewers

  • piyush-zlai

Poem

Code shifts dance on silent screens,
Dependencies line up like dreams,
New catalogs bridge the divide,
With Kryo magic on our side,
Tests and builds now sing in tune,
Cheers as CodeRabbit soars to the moon!
🚀

Warning

Review ran into problems

🔥 Problems

GitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Base automatically changed from tchow/spark-354 to main February 17, 2025 04:42
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryMetastoreCatalog.scala (1)

8-16: Add error handling.

Basic initialization looks good, but needs error handling.

  override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
+   try {
      super.initialize(name, options)
      val vanillaIcebergCatalog = new SparkCatalog()
      vanillaIcebergCatalog.initialize(name, options)
      setDelegateCatalog(vanillaIcebergCatalog)
+   } catch {
+     case e: Exception => 
+       throw new RuntimeException(s"Failed to initialize BigQuery catalog: ${e.getMessage}", e)
+   }
  }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between f9f8dba and 832b540.

⛔ Files ignored due to path filters (1)
  • cloud_gcp/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar is excluded by !**/*.jar
📒 Files selected for processing (6)
  • catalog/BUILD.bazel (1 hunks)
  • catalog/src/main/scala/ai/chronon/catalog/Catalog.scala (1 hunks)
  • cloud_gcp/BUILD.bazel (4 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryMetastoreCatalog.scala (1 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (3 hunks)
  • tools/build_rules/dependencies/maven_repository.bzl (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:29-60
Timestamp: 2025-01-24T23:55:30.256Z
Learning: In BigQuery integration, table existence check is performed outside the BigQueryFormat.createTable method, at a higher level in TableUtils.createTable.
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: bazel_config_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)

3-21: LGTM!

Imports are well-organized and include necessary BigQuery and GCS dependencies.


101-113: LGTM!

Well-documented reason for ignoring the test with clear setup instructions.

tools/build_rules/dependencies/maven_repository.bzl (1)

63-63:

✅ Verification successful

Verify Iceberg version compatibility with Spark 3.5.


🌐 Web query:

What is the latest version of Apache Iceberg that is compatible with Spark 3.5?

💡 Result:

The latest version of Apache Iceberg compatible with Spark 3.5 is 1.8.0, as shown by:

  • The official Apache Iceberg releases page listing 1.8.0 as the current version[1]
  • Specific Spark 3.5 runtime JARs (for both Scala 2.12 and 2.13) included in the Iceberg 1.8.0 release artifacts[1][8]
  • Documentation examples explicitly using iceberg-spark-runtime-3.5_2.12:1.8.0 for integration[2]

This compatibility allows users to run commands like:

spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.

Earlier versions like 1.6.x and 1. also supported Spark 3.**[9], but 1. represents the current latest release as of February **[current date].

Citations:


Apache Iceberg 1.8.0 is verified compatible with Spark 3.5.

  • Confirmed per Apache releases and documentation.
  • No update is required.

Comment on lines 6 to 7
deps = [
],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add required dependencies.

Missing dependencies for Spark/Iceberg catalog functionality.

Add these dependencies:

    deps = [
+        maven_scala_artifact("org.apache.spark:spark-sql"),
+        maven_scala_artifact("org.apache.iceberg:iceberg-spark-runtime-3.5"),
    ],
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
deps = [
],
deps = [
maven_scala_artifact("org.apache.spark:spark-sql"),
maven_scala_artifact("org.apache.iceberg:iceberg-spark-runtime-3.5"),
],

@tchow-zlai tchow-zlai changed the base branch from main to tchow/revert-spark-354 February 17, 2025 06:03
Base automatically changed from tchow/revert-spark-354 to main February 17, 2025 22:17
@tchow-zlai tchow-zlai changed the title wip feat: introduce iceberg metastore jar Feb 22, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1)

82-177: Potential overhead from registering many classes.
Remove any unneeded classes to keep overhead low.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 2889108 and 8de6087.

📒 Files selected for processing (2)
  • spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#51
File: spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala:192-200
Timestamp: 2024-11-26T19:47:53.900Z
Learning: Only suggest registering Delta Lake action classes for serialization if they are actually used in the codebase.
🔇 Additional comments (1)
spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala (1)

166-166: Consistent config application.
Re-adding merged configs at this point looks good.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)

39-41: ⚠️ Potential issue

Empty configuration values need to be set.

The following BigQuery metastore catalog settings are empty:

  • warehouse
  • gcp_location
  • gcp_project
🧹 Nitpick comments (5)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)

96-97: Single URI requirement
Requiring only one URI may limit multi-file external tables.


137-139: Potential concurrency issue
Storing delegate in a mutable var might cause race conditions in multi-threaded contexts.

cloud_gcp/BUILD.bazel (1)

89-89: Encourage added tests
Consider dedicated tests for BigQuery Catalog integration changes.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala (1)

123-141: Remove commented code.

The old external table implementation is no longer needed.

-//      val baseTableDef = ExternalTableDefinition
-//        .newBuilder(dataGlob, FormatOptions.parquet())
-//        .setAutodetect(true)
-//
-//      if (partitionColumns.nonEmpty) {
-//        val timePartitioning = HivePartitioningOptions
-//          .newBuilder()
-//          .setFields(partitionColumns.toJava)
-//          .setSourceUriPrefix(path)
-//          .setMode("STRINGS")
-//          .build()
-//        baseTableDef.setHivePartitioningOptions(timePartitioning)
-//      }
-//
-//      val tableInfo = TableInfo.newBuilder(shadedTableId, baseTableDef.build).build()
-//      val createdTable = bigQueryClient.create(tableInfo)
-//
-//      println(s"Created external table ${createdTable.getTableId}")
-//
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)

136-139: Consider performance implications of SQL-based loading.

The switch to SQL-based table loading might have performance implications compared to direct loading.

Also applies to: 244-246, 745-747

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 5ee14e5 and a6f2656.

📒 Files selected for processing (9)
  • .github/workflows/require_triggered_status_checks.yaml (1 hunks)
  • cloud_gcp/BUILD.bazel (6 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (3 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (0 hunks)
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala (5 hunks)
  • tools/build_rules/dependencies/maven_repository.bzl (3 hunks)
  • tools/build_rules/dependencies/spark_repository.bzl (1 hunks)
💤 Files with no reviewable changes (1)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
🧰 Additional context used
🧠 Learnings (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala (2)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:29-60
Timestamp: 2025-01-24T23:55:30.256Z
Learning: In BigQuery integration, table existence check is performed outside the BigQueryFormat.createTable method, at a higher level in TableUtils.createTable.
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57
Timestamp: 2025-01-24T23:55:40.650Z
Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:29-60
Timestamp: 2025-01-24T23:55:30.256Z
Learning: In BigQuery integration, table existence check is performed outside the BigQueryFormat.createTable method, at a higher level in TableUtils.createTable.
cloud_gcp/BUILD.bazel (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
⏰ Context from checks skipped due to timeout of 90000ms (11)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
🔇 Additional comments (13)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)

21-27: Doc order differs from code
Doc suggests BigQuery→Iceberg→External, but code attempts Iceberg first.


80-110: Fallback handling
Ensure all exceptions are properly handled before calling defaultSessionCatalog.

tools/build_rules/dependencies/spark_repository.bzl (1)

38-39: Check excluded artifacts
Excluding Hadoop artifacts might cause missing classes if indirectly needed.

cloud_gcp/BUILD.bazel (2)

1-39: Version alignment check
Confirm these artifacts align correctly with Spark 3.5 and Hadoop components.


77-79: Local jar usage
No action needed since this is intentionally a local jar.

tools/build_rules/dependencies/maven_repository.bzl (2)

46-46: LGTM! Added Iceberg Spark runtime dependency.

The addition of Iceberg Spark runtime dependency aligns with the PR objective of introducing Iceberg BigQuery metastore catalog.


88-91:

❓ Verification inconclusive

Verify compatibility of BigQuery connector versions.

The downgrade of BigQuery connector dependencies from 3.0.3 to 2.2.26 needs verification.


🌐 Web query:

What is the latest stable version of Google Cloud BigQuery connector for Hadoop?

💡 Result:

The latest stable version of Google Cloud's BigQuery connector for Hadoop appears to be 3.0.4, released on November 12, 2024[8]. This update includes fixes for downscoping issues with gRPC and improvements for hierarchical namespace bucket operations[8].

For context:

  • Earlier releases (like 2.2.x series) addressed compatibility with different Hadoop versions[8]
  • The Spark BigQuery Connector (a related but separate component) reached 0.42.0 in January 2025 with Spark 3.5 support[3][7]
  • Maven repositories list older connector versions like 0.13.4-hadoop3 and hadoop3-1.2.0, but these are superseded by newer releases[9][10]

Key capabilities in recent versions include direct BigQuery Storage API integration (avoiding intermediate GCS files)[5], predicate/column filtering optimizations[3], and improved authentication handling[1][8].

Citations:


Action: Verify BigQuery connector dependency versions

  • In tools/build_rules/dependencies/maven_repository.bzl (lines 88-91), dependencies were set to 2.2.26.
  • The latest stable BigQuery connector for Hadoop is 3.0.4.
  • Please confirm if the downgrade is intentional and compatible or if an update to 3.0.4 is needed.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala (2)

5-5: LGTM! Updated import for table creation status.

Import change aligns with the new Iceberg-based table creation approach.


110-123: LGTM! Implemented Iceberg table creation.

The new implementation correctly uses Iceberg format with proper partitioning support.

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)

114-117: LGTM! Added catalog type verification test.

Test verifies correct catalog implementation.


123-149: LGTM! Added Kryo serialization test.

Comprehensive test for ResolvingFileIO serialization.

spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)

124-133: LGTM! Improved error handling.

Using Try for better error handling in tableReachable.

.github/workflows/require_triggered_status_checks.yaml (1)

4-4: Push trigger added.
Concise and clear; expands trigger coverage as needed.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)

112-138: Add proper resource cleanup.

Use try-with-resources to ensure proper cleanup of streams.

   it should "kryo serialization for ResolvingFileIO" in {
     val registrator = new ChrononKryoRegistrator()
     val kryo = new Kryo();
     kryo.setRegistrationRequired(true);
     kryo.setReferences(true);
     registrator.registerClasses(kryo)

     // Create an instance of ResolvingFileIO
     val original = new ResolvingFileIO();
     original.initialize(Map.empty[String, String].asJava)

     // Serialize the object
-    val outputStream = new ByteArrayOutputStream();
-    val output = new Output(outputStream);
-    kryo.writeClassAndObject(output, original);
-    output.close();
+    val outputStream = new ByteArrayOutputStream()
+    try {
+      val output = new Output(outputStream)
+      try {
+        kryo.writeClassAndObject(output, original)
+      } finally {
+        output.close()
+      }

     // Deserialize the object
-    val inputStream = new ByteArrayInputStream(outputStream.toByteArray());
-    val input = new Input(inputStream);
-    val deserializedObj = kryo.readClassAndObject(input);
-    input.close();
+      val inputStream = new ByteArrayInputStream(outputStream.toByteArray())
+      val input = new Input(inputStream)
+      try {
+        val deserializedObj = kryo.readClassAndObject(input)
+        assertNotNull("Deserialized object should not be null", deserializedObj)
+        assertTrue("Deserialized object should be an instance of ResolvingFileIO",
+                 deserializedObj.isInstanceOf[ResolvingFileIO])
+      } finally {
+        input.close()
+      }
+    } finally {
+      outputStream.close()
+    }
-    assertNotNull("Deserialized object should not be null", deserializedObj);
-    assertTrue("Deserialized object should be an instance of ResolvingFileIO",
-               deserializedObj.isInstanceOf[ResolvingFileIO]);
   }
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1)

28-38: Track the TODO for catalog delegation.

The comment indicates future work needed for proper catalog delegation.

Would you like me to create an issue to track this TODO?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 67583c8 and a91c224.

📒 Files selected for processing (13)
  • .github/workflows/require_triggered_status_checks.yaml (1 hunks)
  • cloud_gcp/BUILD.bazel (6 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (0 hunks)
  • spark/BUILD.bazel (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala (4 hunks)
  • spark_install.json (12 hunks)
  • tools/build_rules/dependencies/maven_repository.bzl (4 hunks)
  • tools/build_rules/dependencies/spark_repository.bzl (1 hunks)
🔥 Files not summarized due to errors (1)
  • spark_install.json: Error: Server error: no LLM provider could handle the message
💤 Files with no reviewable changes (1)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
🚧 Files skipped from review as they are similar to previous changes (5)
  • spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala
  • .github/workflows/require_triggered_status_checks.yaml
  • tools/build_rules/dependencies/spark_repository.bzl
  • cloud_gcp/BUILD.bazel
  • tools/build_rules/dependencies/maven_repository.bzl
🧰 Additional context used
🧠 Learnings (2)
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#51
File: spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala:192-200
Timestamp: 2024-11-26T19:47:53.900Z
Learning: Only suggest registering Delta Lake action classes for serialization if they are actually used in the codebase.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala (2)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:29-60
Timestamp: 2025-01-24T23:55:30.256Z
Learning: In BigQuery integration, table existence check is performed outside the BigQueryFormat.createTable method, at a higher level in TableUtils.createTable.
Learnt from: tchow-zlai
PR: zipline-ai/chronon#263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:56-57
Timestamp: 2025-01-24T23:55:40.650Z
Learning: For BigQuery table creation operations in BigQueryFormat.scala, allow exceptions to propagate directly without wrapping them in try-catch blocks, as the original BigQuery exceptions provide sufficient context.
⏰ Context from checks skipped due to timeout of 90000ms (17)
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: join_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: bazel_config_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (25)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)

3-19: LGTM!

Imports are well-organized and necessary for the new functionality.


39-41: Empty configuration values need to be set.

The following BigQuery metastore catalog settings are empty:

  • warehouse
  • gcp_location
  • gcp_project
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GCSFormat.scala (2)

110-123: LGTM! Clean transition to Iceberg tables.

The implementation correctly handles table creation and partitioning using Iceberg format.


12-12: LGTM! Required imports added.

The new imports support the Iceberg table implementation.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)

39-58: LGTM! Well-structured table delegation.

Clean implementation that properly delegates operations while preserving additional properties.


94-132: LGTM! Robust catalog implementation with proper fallback.

The loadTable method gracefully handles failures and falls back to BigQuery when needed.

spark/BUILD.bazel (1)

48-48: LGTM! Consistent dependency management.

Changed to maven_artifact to align with project's dependency resolution approach.

spark/src/main/scala/ai/chronon/spark/TableUtils.scala (3)

124-133: LGTM! Improved error handling pattern.

Using Try-match is more idiomatic Scala and improves readability.


136-139: LGTM! Simplified table access.

Direct SQL queries provide cleaner access to table data and schema.

Also applies to: 243-246


745-746: LGTM! Consistent table access pattern.

Using SQL query aligns with the changes in loadTable and getSchemaFromTable.

spark_install.json (12)

1-7: Hashes updated.
Auto-gen hash fields; verify new values.


1022-1031: Hadoop Auth update.
Hashes and version for hadoop-auth updated.


2381-2388: Curator-client deps.
Added zookeeper and slf4j entries; check necessity.


2431-2443: Yarn-common deps.
New guice and jersey modules added; looks good.


2448-2456: Jackson modules.
Adjusted jackson dependencies; approved.


2560-2564: Hive shims.
Added hive-shims-common; concise update.


2571-2577: Hive scheduler.
Included hive-shims-common in scheduler deps.


3072-3079: libthrift deps.
Added httpcore and slf4j; integration looks fine.


3111-3118: Zookeeper deps.
Added slf4j entries; ensure consistency.


8500-8506: Jasper runtime.
Reordered/dependency update; simple change.


9073-9079: Tomcat runtime.
Updated jasper-runtime dependency.


10149-10155: Jasper-runtime sources.
Source ref updated; approved.

spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (3)

118-119: LGTM! Iceberg classes added for BigQuery metastore.

The addition of Iceberg classes aligns with the PR objective of introducing Iceberg BigQuery metastore catalog.


99-102: LGTM! Java util classes added.

Common Java collection classes added for serialization optimization.


107-117:

❓ Verification inconclusive

Verify Hadoop FS classes usage.

Several Hadoop FS classes are registered. Ensure they are required for the Iceberg integration.

Also applies to: 166-167


🏁 Script executed:

#!/bin/bash
# Search for usage of Hadoop FS classes
rg "FileUtil|BlockLocation|FileStatus|Path|StorageType|FsAction|FsPermission" --type scala

Length of output: 35663


Hadoop FS Registrations & Iceberg Integration Verification

  • In ChrononKryoRegistrator.scala (lines 107–117 & 166–167), several Hadoop FS classes (e.g., BlockLocation, FileStatus, various FileUtil variants, etc.) are registered.
  • Grep results don’t reveal direct usage tied to Iceberg integration—usage seems isolated to this registrator.
  • Action: Please double-check if these registrations are essential for Iceberg support or if they can be removed.

"org.apache.spark.sql.types.Metadata",
"ai.chronon.api.Row",
"ai.chronon.spark.KeyWithHash",
"ai.chronon.aggregator.base.ApproxHistogramIr",
"ai.chronon.aggregator.base.MomentsIR",
"ai.chronon.aggregator.windowing.BatchIr",
"ai.chronon.aggregator.base.ApproxHistogramIr",
"ai.chronon.online.RowWrapper",
"ai.chronon.online.fetcher.Fetcher$Request",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove duplicate entry.

ai.chronon.online.fetcher.Fetcher$Request and ai.chronon.online.Fetcher$Request refer to the same class.

-      "ai.chronon.online.fetcher.Fetcher$Request",
       "ai.chronon.aggregator.windowing.FinalBatchIr",
       "ai.chronon.api.Row",
-      "ai.chronon.online.Fetcher$Request",

Also applies to: 90-90

"org.apache.spark.sql.types.Metadata",
"ai.chronon.api.Row",
"ai.chronon.spark.KeyWithHash",
"ai.chronon.aggregator.base.ApproxHistogramIr",
"ai.chronon.aggregator.base.MomentsIR",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove duplicate entry.

ai.chronon.aggregator.base.ApproxHistogramIr is registered twice.

-      "ai.chronon.aggregator.base.ApproxHistogramIr",
       "ai.chronon.aggregator.base.MomentsIR",
       "ai.chronon.aggregator.windowing.BatchIr",
-      "ai.chronon.aggregator.base.ApproxHistogramIr",

Also applies to: 85-85

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between a91c224 and f851f29.

📒 Files selected for processing (6)
  • cloud_gcp/BUILD.bazel (3 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2 hunks)
  • spark/BUILD.bazel (2 hunks)
  • spark_install.json (10 hunks)
  • tools/build_rules/dependencies/maven_repository.bzl (4 hunks)
  • tools/build_rules/dependencies/spark_repository.bzl (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • tools/build_rules/dependencies/spark_repository.bzl
  • spark/BUILD.bazel
  • tools/build_rules/dependencies/maven_repository.bzl
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/BUILD.bazel (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
⏰ Context from checks skipped due to timeout of 90000ms (15)
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: join_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (16)
cloud_gcp/BUILD.bazel (4)

1-39: LGTM! Dependencies consolidated effectively.

The shared_deps variable improves maintainability by centralizing dependency management.


49-56: LGTM! Binary configuration looks good.

The runtime dependencies are properly specified.


77-81: LGTM! Local jar configuration is correct.

Based on the past review comments and learnings, this is intentionally a local jar dependency.


41-47:

❓ Verification inconclusive

Verify format=True impact.

Ensure the format flag doesn't conflict with existing formatters.


🏁 Script executed:

#!/bin/bash
# Check for other formatting tools in the project
fd -t f '\.scalafmt\.conf|\.scalafix\.conf|\.scalastyle|\.scalafmt'

Length of output: 69


Action: Verify Scala formatter config absence manually.

  • In cloud_gcp/BUILD.bazel (lines 41–47), format=True is set.
  • The search script showed no formatter configurations (e.g., .scalafmt.conf, .scalafix.conf, .scalastyle), but please manually verify their absence to ensure no conflicts.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)

3-19: LGTM! Imports are well organized.

The new imports support the added Kryo serialization test.


112-138: LGTM! Kryo serialization test is thorough.

The test properly validates serialization and deserialization of ResolvingFileIO.

spark_install.json (10)

3-4: Hash update validation.
Artifact hash values updated to positive integers. Verify these values align with the new resolution process.


2308-2329: Dependency adjustments.
Added "org.slf4j:slf4j-api" in curator-client and "io.netty:netty" for hadoop-mapreduce-client-core. Confirm these align with your dependency strategy.


2456-2465: Hive shim update.
Included "commons-lang:commons-lang" in hive-shims-common and linked hive-shims-scheduler to it. Looks correct.


2515-2522: OpenCSV dependency added.
"net.sf.opencsv:opencsv" now appears under hive-serde.


2529-2535: Hive shims scheduler inclusion.
"org.apache.hive.shims:hive-shims-scheduler" has been added to the hive-shims list.


2948-2954: Libthrift enhancements.
Added "org.apache.httpcomponents:httpcore" and ensured "org.slf4j:slf4j-api" is present. Verify versions and compatibility.


2972-2978: Twill discovery update.
Added "org.apache.twill:twill-discovery-api" under twill-discovery-core.


8139-8145: Jasper runtime update.
Updated "tomcat:jasper-runtime"; ensure consistency with related dependency listings.


8692-8698: Jasper runtime consistency.
Reaffirms the "tomcat:jasper-runtime" entry.


9768-9774: Jasper runtime JAR source.
Updated "tomcat:jasper-runtime:jar:sources". Looks good.

Comment on lines 34 to 48
"spark.sql.catalog.spark_catalog.catalog-impl" -> classOf[BQMSCatalog].getName,
"spark.sql.catalog.spark_catalog" -> classOf[DelegatingBigQueryMetastoreCatalog].getName,
"spark.sql.catalog.spark_catalog.io-impl" -> classOf[ResolvingFileIO].getName,
"spark.sql.defaultUrlStreamHandlerFactory.enabled" -> false.toString,
// set the following
"spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/",
"spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1",
"spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Hardcoded configuration values should be externalized.

Move warehouse, location, and project values to configuration files.

-        "spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/",
-        "spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1",
-        "spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022",
+        "spark.sql.catalog.spark_catalog.warehouse" -> sys.env.getOrElse("WAREHOUSE_PATH", ""),
+        "spark.sql.catalog.spark_catalog.gcp_location" -> sys.env.getOrElse("GCP_LOCATION", ""),
+        "spark.sql.catalog.spark_catalog.gcp_project" -> sys.env.getOrElse("GCP_PROJECT", ""),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"spark.sql.catalog.spark_catalog.catalog-impl" -> classOf[BQMSCatalog].getName,
"spark.sql.catalog.spark_catalog" -> classOf[DelegatingBigQueryMetastoreCatalog].getName,
"spark.sql.catalog.spark_catalog.io-impl" -> classOf[ResolvingFileIO].getName,
"spark.sql.defaultUrlStreamHandlerFactory.enabled" -> false.toString,
// set the following
"spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/",
"spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1",
"spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022",
"spark.sql.catalog.spark_catalog.catalog-impl" -> classOf[BQMSCatalog].getName,
"spark.sql.catalog.spark_catalog" -> classOf[DelegatingBigQueryMetastoreCatalog].getName,
"spark.sql.catalog.spark_catalog.io-impl" -> classOf[ResolvingFileIO].getName,
"spark.sql.defaultUrlStreamHandlerFactory.enabled" -> false.toString,
// set the following
"spark.sql.catalog.spark_catalog.warehouse" -> sys.env.getOrElse("WAREHOUSE_PATH", ""),
"spark.sql.catalog.spark_catalog.gcp_location" -> sys.env.getOrElse("GCP_LOCATION", ""),
"spark.sql.catalog.spark_catalog.gcp_project" -> sys.env.getOrElse("GCP_PROJECT", ""),

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (3)

94-132: Simplify error handling in loadTable.

The nested match expressions and error handling make the code hard to follow.

  override def loadTable(ident: Identifier): Table = {
-    Try { icebergCatalog.loadTable(ident) }
-      .recover {
-        case _ => {
-          val tId = ident.namespace().toList match {
+    def loadBigQueryTable: Table = {
+      val tId = ident.namespace().toList match {
           case database :: Nil            => TableId.of(database, ident.name())
           case project :: database :: Nil => TableId.of(project, database, ident.name())
-          }
-          val table = bigQueryClient.getTable(tId)
+        case _ => throw new IllegalArgumentException(s"Invalid namespace: ${ident.namespace().mkString(".")}")
+      }
+      val table = bigQueryClient.getTable(tId)
+      table.getDefinition match {
+        case externalTable: ExternalTableDefinition => loadExternalTable(tId, externalTable)
+        case _: StandardTableDefinition => loadStandardTable(ident)
+        case _ => throw new IllegalStateException(s"Unsupported table type: ${table.getFriendlyName}")
+      }
+    }
+    
+    Try(icebergCatalog.loadTable(ident))
+      .orElse(Try(loadBigQueryTable))
+      .getOrElse(defaultSessionCatalog.asInstanceOf[TableCatalog].loadTable(ident))
+  }

104-122: Improve URI handling for external tables.

The URI handling is fragile and could fail with different URI patterns.

-              val uri = scala
-                .Option(externalTable.getHivePartitioningOptions)
-                .map(_.getSourceUriPrefix)
-                .getOrElse {
-                  require(uris.size == 1, s"External table ${table} can be backed by only one URI.")
-                  uris.head.replaceAll("/\\*\\.parquet$", "")
-                }
+              val uri = scala.Option(externalTable.getHivePartitioningOptions)
+                .map(_.getSourceUriPrefix)
+                .getOrElse {
+                  require(uris.nonEmpty, s"External table ${table} must have at least one URI")
+                  val baseUri = uris.head.replaceAll("/\\*\\.parquet$", "")
+                  require(uris.tail.forall(_.startsWith(baseUri)), 
+                    s"All URIs must share the same prefix: $baseUri")
+                  baseUri
+                }

28-30: Address TODO comment.

The TODO comment indicates missing functionality for proper catalog delegation.

Would you like me to help create unit tests or implement the chronon catalog delegation?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between f851f29 and 387d643.

📒 Files selected for processing (3)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala (4 hunks)
  • tools/build_rules/dependencies/spark_repository.bzl (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • tools/build_rules/dependencies/spark_repository.bzl
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala
🔇 Additional comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)

1-27: LGTM!

Imports are well-organized and comprehensive.


60-63: LGTM!

Clean factory method implementation.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
cloud_gcp/BUILD.bazel (1)

79-83: Consider documenting the local jar dependency.

Add a comment explaining why this is a local jar.

 java_import(
     name = "iceberg_bigquery_catalog_lib",
+    # Local jar dependency - custom build for BigQuery catalog integration
     jars = ["iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar"],
     visibility = ["//visibility:public"],
 )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 3754ec3 and 1275acf.

📒 Files selected for processing (5)
  • cloud_gcp/BUILD.bazel (3 hunks)
  • cloud_gcp/src/main/resources/services/org.apache.spark.sql.sources.DataSourceRegister (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala (3 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/RenamedBigQueryRelationProvider.scala (1 hunks)
  • tools/build_rules/dependencies/maven_repository.bzl (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
  • tools/build_rules/dependencies/maven_repository.bzl
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/BUILD.bazel (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: bazel_config_tests
  • GitHub Check: non_spark_tests
🔇 Additional comments (4)
cloud_gcp/src/main/resources/services/org.apache.spark.sql.sources.DataSourceRegister (1)

1-1: LGTM!

Correctly registers the BigQuery data source provider.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/RenamedBigQueryRelationProvider.scala (1)

5-7:

❓ Verification inconclusive

Verify usage of renamed provider.

Implementation looks good. Ensure consumers are updated to use "bigquery-renamed".


🏁 Script executed:

#!/bin/bash
# Search for usage of the renamed provider
rg "bigquery-renamed" --type scala

Length of output: 187


Ensure consumer updates are verified.
The class implementation is correct. However, note that our search only shows the "bigquery-renamed" string in this file. Please manually verify that all consumers reference the new name.

cloud_gcp/BUILD.bazel (2)

1-40: LGTM! Dependencies consolidated effectively.

Clean organization of shared dependencies.


42-49: LGTM! Library configuration is correct.

Resources glob pattern will include DataSourceRegister file.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1)

42-46: ⚠️ Potential issue

Add type checking before casting.

Previous review comment still applies.

-  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
-    internalTable.asInstanceOf[SupportsRead].newScanBuilder(options)
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+    require(internalTable.isInstanceOf[SupportsRead], "Table does not support reading")
+    internalTable.asInstanceOf[SupportsRead].newScanBuilder(options)
+  }

-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
-    internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info)
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    require(internalTable.isInstanceOf[SupportsWrite], "Table does not support writing")
+    internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info)
+  }
🧹 Nitpick comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (3)

97-104: Simplify URI extraction logic.

Extract URI handling into a separate method for better readability.

+  private def extractUri(externalTable: ExternalTableDefinition): String = {
+    val uris = externalTable.getSourceUris.asScala
+    scala.Option(externalTable.getHivePartitioningOptions)
+      .map(_.getSourceUriPrefix)
+      .getOrElse {
+        require(uris.size == 1, s"External table can be backed by only one URI.")
+        uris.head.replaceAll("/\\*\\.parquet$", "")
+      }
+  }

86-124: Improve error handling in loadTable.

Add specific error types and logging.

-    Try { icebergCatalog.loadTable(ident) }
+    Try(icebergCatalog.loadTable(ident))
+      .recoverWith {
+        case e: IllegalArgumentException =>
+          // Log specific error
+          loadBigQueryTable(ident)
+        case e: Exception =>
+          // Log unexpected error
+          Try(loadBigQueryTable(ident))
+      }

59-64: Consider making clients configurable.

Allow injection of BigQuery and Iceberg clients for testing.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 1275acf and 59026f1.

📒 Files selected for processing (2)
  • cloud_gcp/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (17)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: bazel_config_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
cloud_gcp/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister (1)

1-1: LGTM!

Correctly registers the BigQuery relation provider.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)

85-123: Simplify loadTable recovery logic.

The recovery path is complex and could be extracted into a separate method for better readability.

 override def loadTable(ident: Identifier): Table = {
   Try { icebergCatalog.loadTable(ident) }
-    .recover {
-      case _ => {
-        val tId = ident.namespace().toList match {
-          case database :: Nil            => TableId.of(database, ident.name())
-          case project :: database :: Nil => TableId.of(project, database, ident.name())
-        }
-        val table = bigQueryClient.getTable(tId)
-        table.getDefinition.asInstanceOf[TableDefinition] match {
-          case externalTable: ExternalTableDefinition => {
-            // ... external table handling
-          }
-          case _: StandardTableDefinition => {
-            // ... standard table handling
-          }
-          case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")
-        }
-      }
-    }
+    .recover { case _ => loadBigQueryTable(ident) }
     .getOrElse(defaultSessionCatalog.asInstanceOf[TableCatalog].loadTable(ident))
 }
+
+private def loadBigQueryTable(ident: Identifier): Table = {
+  val tId = createTableId(ident)
+  val table = bigQueryClient.getTable(tId)
+  table.getDefinition.asInstanceOf[TableDefinition] match {
+    case externalTable: ExternalTableDefinition => handleExternalTable(externalTable, tId)
+    case _: StandardTableDefinition => handleStandardTable(ident)
+    case _ => throw new UnsupportedOperationException(
+      s"Table type '${table.getDefinition.getType}' is not supported for table: ${table.getFriendlyName}")
+  }
+}

118-118: Enhance error message.

Make the error message more descriptive by including the actual table type.

-            case _ => throw new IllegalStateException(s"Cannot support table of type: ${table.getFriendlyName}")
+            case _ => throw new UnsupportedOperationException(
+              s"Table type '${table.getDefinition.getType}' is not supported for table: ${table.getFriendlyName}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 7f9bc0f and e9d47de.

📒 Files selected for processing (2)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: groupby_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)

41-45: Add type checking before casting.

The unsafe casts could fail at runtime if the internal table doesn't support read/write operations.


51-54: LGTM!

Clean factory method implementation.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)

41-43: 🛠️ Refactor suggestion

Externalize configuration values.

Move hardcoded warehouse path, location, and project values to configuration files.

-        "spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/",
-        "spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1",
-        "spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022",
+        "spark.sql.catalog.spark_catalog.warehouse" -> sys.env.getOrElse("WAREHOUSE_PATH", ""),
+        "spark.sql.catalog.spark_catalog.gcp_location" -> sys.env.getOrElse("GCP_LOCATION", ""),
+        "spark.sql.catalog.spark_catalog.gcp_project" -> sys.env.getOrElse("GCP_PROJECT", ""),
🧹 Nitpick comments (2)
cloud_gcp/BUILD.bazel (1)

1-40: Consider organizing dependencies by category.

Group dependencies into categories (e.g., core, testing, logging) for better maintainability.

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)

75-84: Document test requirements.

Add clear documentation for the ignored integration tests explaining how to run them locally.

Also applies to: 86-98, 100-112

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between ea79cd3 and ea9e060.

📒 Files selected for processing (2)
  • cloud_gcp/BUILD.bazel (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2 hunks)
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/BUILD.bazel (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: non_spark_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: bazel_config_tests
🔇 Additional comments (6)
cloud_gcp/BUILD.bazel (4)

42-49: LGTM! Good use of shared dependencies.

The scala_library definition is well-structured with proper visibility and resource handling.


51-58: LGTM! Runtime dependencies are properly configured.

The jvm_binary configuration correctly includes necessary runtime dependencies.


79-83: Local jar dependency noted.

Based on previous feedback, this is intentionally using a local jar file.


85-93: LGTM! Test configuration is comprehensive.

Test suite properly includes shared dependencies and test-specific dependencies.

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)

3-17: LGTM! Well-organized imports.

Imports are logically grouped and necessary for the functionality.


118-144: LGTM! Comprehensive Kryo serialization test.

The test thoroughly validates serialization and deserialization of ResolvingFileIO.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)

113-139: Add state verification for deserialized object.

Add assertions to verify the internal state of the deserialized ResolvingFileIO matches the original.

     assertTrue("Deserialized object should be an instance of ResolvingFileIO",
                deserializedObj.isInstanceOf[ResolvingFileIO]);
+    val deserialized = deserializedObj.asInstanceOf[ResolvingFileIO]
+    assertEquals("Deserialized object should have same configuration",
+                 original.properties(), deserialized.properties())
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between ea9e060 and 5e3e426.

📒 Files selected for processing (2)
  • cloud_gcp/BUILD.bazel (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • cloud_gcp/BUILD.bazel
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: bazel_config_tests
🔇 Additional comments (2)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)

40-42: Externalize configuration values.

Hardcoded GCP configuration values should be moved to environment variables.

-        "spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/",
-        "spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1",
-        "spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022",
+        "spark.sql.catalog.spark_catalog.warehouse" -> sys.env.getOrElse("WAREHOUSE_PATH", ""),
+        "spark.sql.catalog.spark_catalog.gcp_location" -> sys.env.getOrElse("GCP_LOCATION", ""),
+        "spark.sql.catalog.spark_catalog.gcp_project" -> sys.env.getOrElse("GCP_PROJECT", ""),

48-52: LGTM! Test cases properly verify runtime classes and GCP providers.

Also applies to: 54-59

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
cloud_gcp/BUILD.bazel (2)

1-41: Consider organizing shared_deps by purpose.

Group related dependencies (e.g., Google Cloud, Scala, Logging) for better maintainability.


38-40: Remove commented code.

Delete the commented spark-bigquery-with-dependencies line.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 5e3e426 and 4982d1d.

📒 Files selected for processing (3)
  • cloud_gcp/BUILD.bazel (2 hunks)
  • tools/build_rules/dependencies/scala_2_12_repository.bzl (1 hunks)
  • tools/build_rules/dependencies/scala_2_13_repository.bzl (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • tools/build_rules/dependencies/scala_2_12_repository.bzl
  • tools/build_rules/dependencies/scala_2_13_repository.bzl
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/BUILD.bazel (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: groupby_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: bazel_config_tests
🔇 Additional comments (4)
cloud_gcp/BUILD.bazel (4)

43-50: LGTM! Good use of shared_deps.

Clean configuration with proper visibility and resource handling.


80-84: Using local jar as previously discussed.

Based on past feedback, this is intentionally a local dependency.


86-94: LGTM! Comprehensive test configuration.

Good inclusion of resources and proper dependency composition.


52-59:

❓ Verification inconclusive

Verify runtime dependencies.

Check if all required Iceberg dependencies are included for the submitter.


🏁 Script executed:

#!/bin/bash
# Search for Iceberg-related dependencies in the codebase
rg "iceberg" -g "BUILD*"

Length of output: 440


Verify Iceberg dependencies for submitter

  • Confirm that "iceberg-spark-runtime-3.5" is sufficient.
  • If the submitter needs BigQuery support, add ":iceberg_bigquery_catalog_lib" (declared in the BUILD file).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (3)

48-50: Add more assertions for runtime class verification.

Add assertions for class availability and configuration.

 it should "google runtime classes are available" in {
   assertTrue(GoogleHadoopFileSystemConfiguration.BLOCK_SIZE.isInstanceOf[HadoopConfigurationProperty[_]])
+  assertNotNull(GoogleHadoopFileSystemConfiguration.BLOCK_SIZE.getDefault)
+  assertTrue(GoogleHadoopFileSystemConfiguration.values().nonEmpty)
   assertCompiles("classOf[GoogleHadoopFileSystem]")
   assertCompiles("classOf[GoogleHadoopFS]")
 }

110-136: Add edge case tests for serialization.

Test serialization with non-empty configuration.

     val original = new ResolvingFileIO();
-    original.initialize(Map.empty[String, String].asJava)
+    val config = Map("fs.gs.project.id" -> "test-project")
+    original.initialize(config.asJava)

74-83: Address ignored integration tests.

Multiple integration tests are ignored. Consider using test containers or mocks.

Would you like help setting up BigQuery test containers or mocks for these tests?

Also applies to: 85-97, 99-105

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 4982d1d and 7fcb67e.

📒 Files selected for processing (5)
  • cloud_gcp/BUILD.bazel (2 hunks)
  • cloud_gcp/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala (3 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala (1 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • cloud_gcp/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala
  • cloud_gcp/BUILD.bazel
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: groupby_tests
  • GitHub Check: spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: bazel_config_tests
  • GitHub Check: fetcher_tests
🔇 Additional comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala (1)

66-73: LGTM! Good package encapsulation.

Visibility change enables reuse within cloud_gcp package while maintaining encapsulation.

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)

40-42: Externalize configuration values.

Hardcoded GCP configuration values should be moved to environment variables.

-        "spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/",
-        "spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1",
-        "spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022",
+        "spark.sql.catalog.spark_catalog.warehouse" -> sys.env.getOrElse("WAREHOUSE_PATH", ""),
+        "spark.sql.catalog.spark_catalog.gcp_location" -> sys.env.getOrElse("GCP_LOCATION", ""),
+        "spark.sql.catalog.spark_catalog.gcp_project" -> sys.env.getOrElse("GCP_PROJECT", ""),

Comment on lines 105 to 130
val tblFormat = GcpFormatProvider(spark).format("data.purchases").get
val partitions = tblFormat.partitions("data.purchases")(spark)
assertEquals(partitions.flatMap(_.keys), Seq("ds"))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add table existence check.

Verify table exists before testing partitions.

+    assertNotNull("Table should exist", spark.catalog.getTable("data.purchases"))
     val tblFormat = GcpFormatProvider(spark).format("data.purchases").get
     val partitions = tblFormat.partitions("data.purchases")(spark)
     assertEquals(partitions.flatMap(_.keys), Seq("ds"))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val tblFormat = GcpFormatProvider(spark).format("data.purchases").get
val partitions = tblFormat.partitions("data.purchases")(spark)
assertEquals(partitions.flatMap(_.keys), Seq("ds"))
}
assertNotNull("Table should exist", spark.catalog.getTable("data.purchases"))
val tblFormat = GcpFormatProvider(spark).format("data.purchases").get
val partitions = tblFormat.partitions("data.purchases")(spark)
assertEquals(partitions.flatMap(_.keys), Seq("ds"))
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)

110-136: Add cleanup for Kryo serialization test.

     assertTrue("Deserialized object should be an instance of ResolvingFileIO",
                deserializedObj.isInstanceOf[ResolvingFileIO]);
+    // Cleanup resources
+    outputStream.close()
+    inputStream.close()
   }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 7fcb67e and ffc38d4.

📒 Files selected for processing (3)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2 hunks)
  • tools/build_rules/dependencies/scala_2_12_repository.bzl (1 hunks)
  • tools/build_rules/dependencies/scala_2_13_repository.bzl (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • tools/build_rules/dependencies/scala_2_12_repository.bzl
  • tools/build_rules/dependencies/scala_2_13_repository.bzl
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: streaming_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: bazel_config_tests
🔇 Additional comments (2)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)

40-42: Move hardcoded GCP configuration to environment variables.

-        "spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/",
-        "spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1",
-        "spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022",
+        "spark.sql.catalog.spark_catalog.warehouse" -> sys.env.getOrElse("WAREHOUSE_PATH", ""),
+        "spark.sql.catalog.spark_catalog.gcp_location" -> sys.env.getOrElse("GCP_LOCATION", ""),
+        "spark.sql.catalog.spark_catalog.gcp_project" -> sys.env.getOrElse("GCP_PROJECT", ""),

105-107: Add table existence check before testing partitions.

+    assertNotNull("Table should exist", spark.catalog.getTable("data.purchases"))
     val tblFormat = GcpFormatProvider(spark).format("data.purchases").get
     val partitions = tblFormat.partitions("data.purchases")(spark)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)

85-124: Improve error handling in loadTable.

Missing specific error handling for different failure scenarios.

   override def loadTable(ident: Identifier): Table = {
-    Try { icebergCatalog.loadTable(ident) }
+    Try(icebergCatalog.loadTable(ident))
       .recover {
-        case _ => {
+        case e: Exception => {
+          log.debug(s"Failed to load table from Iceberg catalog: ${e.getMessage}")

58-63: Consider eager initialization of critical services.

Lazy initialization could delay failure detection.

-  @transient private lazy val bqOptions = BigQueryOptions.getDefaultInstance
-  @transient private lazy val bigQueryClient: BigQuery = bqOptions.getService
+  @transient private val bqOptions = BigQueryOptions.getDefaultInstance
+  @transient private val bigQueryClient: BigQuery = bqOptions.getService
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between e542552 and 9a3e311.

📒 Files selected for processing (4)
  • cloud_gcp/BUILD.bazel (3 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1 hunks)
  • tools/build_rules/dependencies/scala_2_12_repository.bzl (1 hunks)
  • tools/build_rules/dependencies/scala_2_13_repository.bzl (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • tools/build_rules/dependencies/scala_2_13_repository.bzl
  • tools/build_rules/dependencies/scala_2_12_repository.bzl
  • cloud_gcp/BUILD.bazel
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: bazel_config_tests
🔇 Additional comments (3)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (3)

41-45: Add type checking before casting.

Unsafe casts could fail at runtime.

-  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
-    internalTable.asInstanceOf[SupportsRead].newScanBuilder(options)
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+    require(internalTable.isInstanceOf[SupportsRead], "Table does not support reading")
+    internalTable.asInstanceOf[SupportsRead].newScanBuilder(options)
+  }

-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
-    internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info)
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    require(internalTable.isInstanceOf[SupportsWrite], "Table does not support writing")
+    internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info)
+  }

51-54: LGTM!

Clean factory method implementation.


96-104: Enhance URI pattern validation.

Silent failures possible if URI pattern doesn't match expectations.

-                  uris.head.replaceAll("/\\*\\.parquet$", "")
+                  val uri = uris.head
+                  val pattern = "/\\*\\.parquet$"
+                  val newUri = uri.replaceAll(pattern, "")
+                  require(newUri != uri, s"URI $uri does not match expected pattern */*.parquet")
+                  newUri

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)

41-45: ⚠️ Potential issue

Add type checking before casting.

Unsafe casts could fail at runtime.

-  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
-    internalTable.asInstanceOf[SupportsRead].newScanBuilder(options)
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+    require(internalTable.isInstanceOf[SupportsRead], "Table does not support reading")
+    internalTable.asInstanceOf[SupportsRead].newScanBuilder(options)
+  }

-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
-    internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info)
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    require(internalTable.isInstanceOf[SupportsWrite], "Table does not support writing")
+    internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info)
+  }

41-45: ⚠️ Potential issue

Add type checking before casting.

Unsafe casts could fail at runtime.

-  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
-    internalTable.asInstanceOf[SupportsRead].newScanBuilder(options)
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+    require(internalTable.isInstanceOf[SupportsRead], "Table does not support reading")
+    internalTable.asInstanceOf[SupportsRead].newScanBuilder(options)
+  }

-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
-    internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info)
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    require(internalTable.isInstanceOf[SupportsWrite], "Table does not support writing")
+    internalTable.asInstanceOf[SupportsWrite].newWriteBuilder(info)
+  }
🧹 Nitpick comments (8)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (4)

101-104: Add assertions for partition values.

Verify partition values before comparison.

+    assertNotNull("Partitions should not be null", externalPartitions)
     assertEquals(Seq("2023-11-30"), externalPartitions)
+    assertNotNull("Partitions should not be null", nativePartitions)
     assertEquals(Set(20231118, 20231122, 20231125, 20231102, 20231123, 20231119, 20231130, 20231101, 20231117, 20231110, 20231108, 20231112, 20231115, 20231116, 20231113, 20231104, 20231103, 20231106, 20231121, 20231124, 20231128, 20231109, 20231127, 20231129, 20231126, 20231114, 20231107, 20231111, 20231120, 20231105).map(_.toString), nativePartitions.toSet)

99-104: Improve test data management.

Replace hardcoded partition values with test constants or data providers.

-    assertEquals(Set(20231118, 20231122, 20231125, 20231102, ...), nativePartitions.toSet)
+    val EXPECTED_PARTITIONS = Set(20231118, 20231122, /* ... */)
+    assertEquals(EXPECTED_PARTITIONS, nativePartitions.toSet)

48-52: Improve type checking in the test.

Replace wildcard type with explicit type parameter.

-    assertTrue(GoogleHadoopFileSystemConfiguration.BLOCK_SIZE.isInstanceOf[HadoopConfigurationProperty[_]])
+    assertTrue(GoogleHadoopFileSystemConfiguration.BLOCK_SIZE.isInstanceOf[HadoopConfigurationProperty[Long]])

101-104: Magic numbers in test assertions.

Extract date values into named constants.

+  private val EXPECTED_EXTERNAL_PARTITION = "2023-11-30"
+  private val EXPECTED_NATIVE_PARTITIONS = Set(20231118, 20231122, 20231125, 20231102, /* ... */)

-    assertEquals(Seq("2023-11-30"), externalPartitions)
+    assertEquals(Seq(EXPECTED_EXTERNAL_PARTITION), externalPartitions)
-    assertEquals(Set(20231118, 20231122, /* ... */), nativePartitions.toSet)
+    assertEquals(EXPECTED_NATIVE_PARTITIONS, nativePartitions.toSet)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (4)

98-105: Enhance URI pattern validation.

Add explicit validation for URI patterns.

               val uris = externalTable.getSourceUris.asScala
               val uri = scala
                 .Option(externalTable.getHivePartitioningOptions)
                 .map(_.getSourceUriPrefix)
                 .getOrElse {
                   require(uris.size == 1, s"External table ${table} can be backed by only one URI.")
-                  uris.head.replaceAll("/\\*\\.parquet$", "")
+                  val uri = uris.head
+                  require(uri.endsWith("/*.parquet"), s"URI must end with /*.parquet: $uri")
+                  uri.replaceAll("/\\*\\.parquet$", "")
                 }

117-119: Document the BigQueryCatalog bug workaround.

Add more context about the bug and workaround.

-              // Hack because there's a bug in the BigQueryCatalog where they ignore the projectId.
+              // TODO: Remove workaround once https://issues.apache.org/jira/browse/SPARK-XXXXX is fixed
+              // BigQueryCatalog ignores projectId when loading tables, so we pass only dataset and table
               val connectorTable = connectorCatalog.loadTable(Identifier.of(Array(tId.getDataset), tId.getTable))

117-119: Document the BigQueryCatalog bug workaround.

Add a link to the bug tracker issue for future reference.

-              // Hack because there's a bug in the BigQueryCatalog where they ignore the projectId.
+              // TODO: Remove workaround once fixed: https://issues.apache.org/jira/browse/SPARK-XXXXX
+              // BigQueryCatalog currently ignores the projectId, so we need to work around it

85-126: Improve error handling in loadTable.

Add specific error types and logging.

-    Try { icebergCatalog.loadTable(ident) }
+    Try { 
+      icebergCatalog.loadTable(ident) 
+    }.recover {
+      case e: NoSuchTableException => 
+        logger.debug(s"Table not found in Iceberg catalog: ${ident}", e)
+        loadBigQueryTable(ident)
+      case e: Exception =>
+        logger.warn(s"Failed to load table from Iceberg catalog: ${ident}", e)
+        loadBigQueryTable(ident)
     }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 9a3e311 and 40219d8.

📒 Files selected for processing (2)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2 hunks)
👮 Files not reviewed due to content moderation or server errors (2)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: bazel_config_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: non_spark_tests
🔇 Additional comments (8)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (6)

35-42: Hardcoded configuration values should be externalized.

Move sensitive configuration values to environment variables.

-        "spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/",
-        "spark.sql.catalog.spark_catalog.gcp_location" -> "uc-central1",
-        "spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022",
+        "spark.sql.catalog.spark_catalog.warehouse" -> sys.env.getOrElse("WAREHOUSE_PATH", ""),
+        "spark.sql.catalog.spark_catalog.gcp_location" -> sys.env.getOrElse("GCP_LOCATION", ""),
+        "spark.sql.catalog.spark_catalog.gcp_project" -> sys.env.getOrElse("GCP_PROJECT", ""),

111-137: LGTM! Well-structured Kryo serialization test.

The test thoroughly validates serialization and deserialization of ResolvingFileIO.


40-42: Hardcoded configuration values should be externalized.

Move warehouse, location, and project values to configuration files.


111-137: Well-structured serialization test!

Good coverage of serialization/deserialization with proper resource cleanup.


35-42: Hardcoded configuration values should be externalized.


111-137: LGTM! Good test coverage for serialization.

The test thoroughly verifies Kryo serialization/deserialization of ResolvingFileIO.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (2)

41-45: Add type checking before casting.


98-105: Enhance URI Pattern Validation.

Comment on lines +98 to +138
val uris = externalTable.getSourceUris.asScala
val uri = scala
.Option(externalTable.getHivePartitioningOptions)
.map(_.getSourceUriPrefix)
.getOrElse {
require(uris.size == 1, s"External table ${table} can be backed by only one URI.")
uris.head.replaceAll("/\\*\\.parquet$", "")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Enhance URI pattern validation.

URI handling could fail silently if pattern doesn't match.


🏁 Script executed:

#!/bin/bash
# Check for non-standard URI patterns in external tables
rg -A 2 'setSourceUris|setSourceUriPrefix' .

Length of output: 963


Update URI Validation Logic

  • In DelegatingBigQueryMetastoreCatalog.scala (lines 98–105), the call to replaceAll("/\\*\\.parquet$", "") may leave the URI unchanged if it doesn't match exactly, causing silent failures.
  • Please add explicit validation (or a warning/error) to detect when the regex doesn't affect the URI, ensuring that non-standard patterns don't slip through.

Copy link
Contributor

@david-zlai david-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of comments. this one should be safe to merge first right since I think #424 actually plugs it in?

wait i forgot that PR merges into this one lol

}
}

class GCSFileIOSerializer extends Serializer[GCSFileIO] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we leave a comment that we're doing this custom serializer because kyro doesn't serialize the gcsfileio

also was wondering if kyro doesn't do it well, could we potentially have defaulted back to java serializer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could actually, I had considered that. We would need a class-specific override. That might be better though, I'll look into changing this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed it to use java serialization. let me test this on etsy. I had this originally, can't remember why I switched.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to work, I'll keep to java serialization!

}

override def read(kryo: Kryo, input: Input, `type`: Class[GCSFileIO]): GCSFileIO = {
val props = kryo.readObject(input, classOf[SerializableMap[String, String]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆

@transient private lazy val icebergCatalog: SparkCatalog = new SparkCatalog()
@transient private lazy val connectorCatalog: BigQueryCatalog = new BigQueryCatalog()

// Some stupid spark settings.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆 are these required?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catalogName corresponds to spark_catalog in the config prop, and it is required for spark to accurately route to this catalog extension. Let me add a comment

We don't really use defaultSessionCatalog at all in the GCP situation, it basically is spark's default HiveCatalogImpl.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I think there might be more correct way to map the GCP primitives to these values. I will do that in a followup and add unit tests.

override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean =
icebergCatalog.dropNamespace(namespace, cascade)

override def listTables(namespace: Array[String]): Array[Identifier] = icebergCatalog.listTables(namespace)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this method and the ones above it, do we eventually need to implement the bq catalog versions of them as fallbacks?

Copy link
Collaborator Author

@tchow-zlai tchow-zlai Mar 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as I can tell, the iceberg catalog will do regular bq catalog stuff as intended with the only difference being when it loads the table. It'll fail to loadTable if it's not an iceberg table, so we try other clients first before finally failing.

Copy link
Contributor

@piyush-zlai piyush-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm - we don't need any additional streaming tests as we tested this once together last week right? (Don't think I see any major updates deps wise since then?)

import org.apache.iceberg.gcp.gcs.GCSFileIO
import org.apache.iceberg.util.SerializableMap

class ChrononIcebergKryoRegistrator extends ChrononKryoRegistrator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still needed? based on the description in the pr, you mentioned that its not right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still needed. We need to register iceberg classes in kryo so that we can write iceberg. There is an edge case with GCSFileIO that it can't be kryoserialized (or I haven't gotten it to work, despite some internet research). So for just that class we will fall back to regular java serialization.

@tchow-zlai
Copy link
Collaborator Author

To confirm - we don't need any additional streaming tests as we tested this once together last week right? (Don't think I see any major updates deps wise since then?)

I tested this on Etsy, will plan to roll this out and then test again today. I will rollback if needed.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ChrononIcebergKryoRegistrator.scala (1)

18-42: Confirm class registrations are needed.

Remove any classes not actually used at runtime to reduce overhead.

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)

55-56: Trivial type check test.

Consider verifying more than just instance type for stronger coverage.


101-103: Avoid hardcoded date literal.

Parameterize or generate this date to reduce future maintenance.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1)

90-91: Avoid null fields.

Use Option or default values to prevent potential null pointer issues.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 3bf6059 and 0252794.

📒 Files selected for processing (3)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ChrononIcebergKryoRegistrator.scala (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (3 hunks)
🧰 Additional context used
🧠 Learnings (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ChrononIcebergKryoRegistrator.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#51
File: spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala:192-200
Timestamp: 2024-11-26T19:47:53.900Z
Learning: Only suggest registering Delta Lake action classes for serialization if they are actually used in the codebase.
⏰ Context from checks skipped due to timeout of 90000ms (16)
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: streaming_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: groupby_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: spark_tests
  • GitHub Check: fetcher_tests
  • GitHub Check: analyzer_tests
  • GitHub Check: join_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: bazel_config_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: fetcher_tests
🔇 Additional comments (6)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ChrononIcebergKryoRegistrator.scala (1)

16-16: Use JavaSerializer with caution.

JavaSerializer is more universal but may be slower than Kryo. Confirm performance is acceptable.

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (2)

142-157: Add resource cleanup in Kryo tests.

Close the outputStream in a finally block to prevent leaks.


170-185: Same resource cleanup needed.

Wrap output/input streams in try-finally or use a safer approach.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (3)

48-52: Check type before casting.

Add require(internalTable.isInstanceOf[SupportsRead]) and require(internalTable.isInstanceOf[SupportsWrite]) to avoid runtime errors.


129-129: Handle null table.

bigQueryClient.getTable(tId) can be null if table doesn't exist.


137-137: Validate URI modifications.

If the regex doesn't match, URI remains unchanged. Log or fail on unexpected patterns.

tchow-zlai and others added 5 commits March 5, 2025 10:13
Co-authored-by: Thomas Chow <[email protected]>

Co-authored-by: Thomas Chow <[email protected]>

Co-authored-by: Thomas Chow <[email protected]>

Co-authored-by: Thomas Chow <[email protected]>

Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>

Co-authored-by: Thomas Chow <[email protected]>

Co-authored-by: Thomas Chow <[email protected]>

Co-authored-by: Thomas Chow <[email protected]>

Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>

Co-authored-by: Thomas Chow <[email protected]>

Co-authored-by: Thomas Chow <[email protected]>

Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>

Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>

Co-authored-by: Thomas Chow <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (4)

159-185: ⚠️ Potential issue

Same resource leak in GCSFileIO test.

Apply same fix pattern as previous test.


132-157: ⚠️ Potential issue

Fix resource leaks in serialization test.

Missing proper stream closure in try/finally blocks.

-    val outputStream = new ByteArrayOutputStream();
-    val output = new Output(outputStream);
-    kryo.writeClassAndObject(output, original);
-    output.close();
+    val outputStream = new ByteArrayOutputStream()
+    val output = new Output(outputStream)
+    try {
+      kryo.writeClassAndObject(output, original)
+    } finally {
+      output.close()
+      outputStream.close()
+    }

-    val inputStream = new ByteArrayInputStream(outputStream.toByteArray());
-    val input = new Input(inputStream);
-    val deserializedObj = kryo.readClassAndObject(input);
-    input.close();
+    val inputStream = new ByteArrayInputStream(outputStream.toByteArray())
+    val input = new Input(inputStream)
+    var deserializedObj: Any = null
+    try {
+      deserializedObj = kryo.readClassAndObject(input)
+    } finally {
+      input.close()
+      inputStream.close() 
+    }

132-157: 🛠️ Refactor suggestion

Resource leak in ResolvingFileIO serialization test.

Input/output streams not properly closed in finally blocks.

-    val outputStream = new ByteArrayOutputStream();
-    val output = new Output(outputStream);
-    kryo.writeClassAndObject(output, original);
-    output.close();
+    var outputStream: ByteArrayOutputStream = null
+    var output: Output = null
+    try {
+      outputStream = new ByteArrayOutputStream()
+      output = new Output(outputStream)
+      kryo.writeClassAndObject(output, original)
+    } finally {
+      if (output != null) output.close()
+    }

-    val inputStream = new ByteArrayInputStream(outputStream.toByteArray());
-    val input = new Input(inputStream);
-    val deserializedObj = kryo.readClassAndObject(input);
-    input.close();
+    var inputStream: ByteArrayInputStream = null
+    var input: Input = null
+    var deserializedObj: Any = null
+    try {
+      inputStream = new ByteArrayInputStream(outputStream.toByteArray())
+      input = new Input(inputStream)
+      deserializedObj = kryo.readClassAndObject(input)
+    } finally {
+      if (input != null) input.close()
+    }

159-185: 🛠️ Refactor suggestion

Similar resource leak in GCSFileIO serialization test.

Same issue as previous test.

-    val outputStream = new ByteArrayOutputStream();
-    val output = new Output(outputStream);
-    kryo.writeClassAndObject(output, original);
-    output.close();
+    var outputStream: ByteArrayOutputStream = null
+    var output: Output = null
+    try {
+      outputStream = new ByteArrayOutputStream()
+      output = new Output(outputStream)
+      kryo.writeClassAndObject(output, original)
+    } finally {
+      if (output != null) output.close()
+    }

-    val inputStream = new ByteArrayInputStream(outputStream.toByteArray());
-    val input = new Input(inputStream);
-    val deserializedObj = kryo.readClassAndObject(input);
-    input.close();
+    var inputStream: ByteArrayInputStream = null
+    var input: Input = null
+    var deserializedObj: Any = null
+    try {
+      inputStream = new ByteArrayInputStream(outputStream.toByteArray())
+      input = new Input(inputStream)
+      deserializedObj = kryo.readClassAndObject(input)
+    } finally {
+      if (input != null) input.close()
+    }
🧹 Nitpick comments (2)
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (1)

112-118: Add assertions for insertPartitions result.

Verify table contents after insert.

     tableUtils.insertPartitions(
       df,
       "data.tchow_test_iceberg",
       Map(
       "file_format" -> "PARQUET",
       "table_type" -> "iceberg"),
       List("ds"),
-      saveMode = SaveMode.Overwrite)
+      saveMode = SaveMode.Overwrite)
+    
+    // Verify insertion succeeded
+    val insertedData = tableUtils.loadTable("data.tchow_test_iceberg")
+    assertNotNull(insertedData)
+    assertTrue(insertedData.count() > 0)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1)

150-150: Offer help with partitioning

The TODO suggests partition support is missing.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 0252794 and 3281d24.

⛔ Files ignored due to path filters (1)
  • cloud_gcp/iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar is excluded by !**/*.jar
📒 Files selected for processing (16)
  • .github/workflows/require_triggered_status_checks.yaml (1 hunks)
  • cloud_gcp/BUILD.bazel (3 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ChrononIcebergKryoRegistrator.scala (1 hunks)
  • cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (1 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (3 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (0 hunks)
  • flink/BUILD.bazel (2 hunks)
  • online/BUILD.bazel (2 hunks)
  • spark/BUILD.bazel (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala (1 hunks)
  • spark_install.json (10 hunks)
  • tools/build_rules/dependencies/maven_repository.bzl (5 hunks)
  • tools/build_rules/dependencies/scala_2_12_repository.bzl (1 hunks)
  • tools/build_rules/dependencies/scala_2_13_repository.bzl (1 hunks)
  • tools/build_rules/dependencies/spark_repository.bzl (1 hunks)
💤 Files with no reviewable changes (1)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
🚧 Files skipped from review as they are similar to previous changes (10)
  • flink/BUILD.bazel
  • spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala
  • .github/workflows/require_triggered_status_checks.yaml
  • tools/build_rules/dependencies/spark_repository.bzl
  • tools/build_rules/dependencies/scala_2_13_repository.bzl
  • tools/build_rules/dependencies/scala_2_12_repository.bzl
  • spark/BUILD.bazel
  • online/BUILD.bazel
  • spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala
  • tools/build_rules/dependencies/maven_repository.bzl
🧰 Additional context used
🧠 Learnings (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ChrononIcebergKryoRegistrator.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#51
File: spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala:192-200
Timestamp: 2024-11-26T19:47:53.900Z
Learning: Only suggest registering Delta Lake action classes for serialization if they are actually used in the codebase.
cloud_gcp/BUILD.bazel (1)
Learnt from: tchow-zlai
PR: zipline-ai/chronon#393
File: cloud_gcp/BUILD.bazel:99-99
Timestamp: 2025-02-22T20:30:28.381Z
Learning: The jar file "iceberg-bigquery-catalog-1.5.2-1.0.1-beta.jar" in cloud_gcp/BUILD.bazel is a local dependency and should not be replaced with maven_artifact.
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: streaming_tests
  • GitHub Check: spark_tests
  • GitHub Check: join_tests
  • GitHub Check: join_tests
  • GitHub Check: groupby_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: analyzer_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (29)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/ChrononIcebergKryoRegistrator.scala (7)

11-16: Serialization fallback explanation is clear.

The comment effectively explains the serialization issues with GCSFileIO closures.


18-41: Complete class list for Iceberg serialization.

List covers essential Iceberg classes for proper serialization.


7-44:

❓ Verification inconclusive

Consider integration tests with actual Iceberg operations.

Add tests verifying this registrator works with real Iceberg operations.


🏁 Script executed:

#!/bin/bash
# Check for integration tests that use ChrononIcebergKryoRegistrator
rg -l "ChrononIcebergKryoRegistrator" --type scala

Length of output: 230


Action: Enhance Integration Testing

It appears that BigQueryCatalogTest.scala references the registrator, but it’s not clear if it covers real Iceberg operations. Please add or verify an integration test that explicitly triggers Iceberg operations on GCS (e.g., writing/reading data) to ensure that the custom registrations, in particular for GCSFileIO, work as expected.


1-6: Imports look good.

Proper import structure for Kryo and Iceberg components.


7-17: Good fallback to JavaSerializer for GCSFileIO.

Using JavaSerializer for GCSFileIO addresses closure serialization issues. Clear documentation of the problem.


18-42: Comprehensive class registration for Iceberg components.

The class list covers essential Iceberg components needed for serialization.


43-44: Efficient registration implementation.

Well-structured method with proper use of superclass initialization.

cloud_gcp/BUILD.bazel (2)

1-40: Good dependency consolidation.

Shared deps pattern improves maintainability.


72-75: Local jar dependency is used appropriately.

This approach is correct for the local iceberg-bigquery-catalog jar.

spark_install.json (10)

3-4: Artifact Hash Update. Updated artifact hashes; ensure alignment with new dependency resolutions.


703-708: Servlet API Update. Shasums and version updated for servlet-api; appears correct.


2294-2326: Dependency Update. New dependencies (e.g., guice and jersey modules) added under hadoop-yarn-common; verify necessity.


2331-2339: Jackson Modules. Minor reordering/update of Jackson modules; benign change.


2441-2450: Hive Shim Update. Updated hive-shims-common and scheduler; dependency chain appears intact.


2523-2529: Hive Shims Expansion. Inclusion of hive-shims-scheduler in hive-shims is noted; check compatibility.


2965-2971: Twill Dependency. Added twill-discovery-api dependency; concise and clear update.


8046-8052: Jasper Runtime. Jasper-runtime updated; confirm artifact hash consistency.


8595-8601: Jasper Runtime Duplicate. Repeated update to jasper-runtime; ensure consistency across entries.


9653-9659: Jasper Sources. Updated jasper-runtime:jar:sources; verify this change is intentional.

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DelegatingBigQueryMetastoreCatalog.scala (6)

49-49: Add safe-cast requirement

Casting to SupportsRead can fail at runtime.


52-52: Add safe-cast requirement

Casting to SupportsWrite can fail at runtime.


89-92: Avoid null fields

Use Option or defaults to prevent NPE risk.


129-129: Check for null

getTable can return null, leading to an NPE.


131-138: Validate URI pattern

replaceAll may silently do nothing if no match.


164-164: Verify catalog fallback

If defaultSessionCatalog is null or not a TableCatalog, fallback may fail.

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (4)

3-26: Imports organized properly.

Appropriate imports for testing Kryo serialization with Iceberg components.


54-55: Type check improved.

Using wildcard type parameter is more flexible.


36-49:

✅ Verification successful

Configuration commented out for local testing.

Local integration test configuration is commented out. Uncomment when needed.


🏁 Script executed:

#!/bin/bash
# Check if the configuration values exist in environment variables
echo "Checking environment variables for local testing..."
for var in WAREHOUSE_PATH GCP_LOCATION GCP_PROJECT; do
  if [ -z "${!var}" ]; then
    echo "Warning: $var is not set"
  fi
done

Length of output: 546


Action: Leave local integration configuration commented out until needed.

The configuration in
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (lines 36–49) is intentionally commented out for local testing. Verification confirms that the required environment variables (WAREHOUSE_PATH, GCP_LOCATION, GCP_PROJECT) are not set.

  • When running local integration tests, set these environment variables and uncomment the configuration.
  • Otherwise, no action is required.

99-130:

❓ Verification inconclusive

Integration test needs verification.

Assertions on partitions look good, but test is ignored. Consider enabling when needed.


🏁 Script executed:

#!/bin/bash
# Check partitioning columns in BigQuery
echo "Checking for partitioning column information..."
fd . -t f -e scala | xargs grep -l "partitioning" | xargs grep -A 5 "partitioning"

Length of output: 8263


Integration Test Pending Stub Implementation

  • Assertions on partitioning are correct.
  • Test remains ignored due to live BigQuery dependency.
  • Re-enable once stubs/mock support is added.

Comment on lines +40 to +49
// "spark.hadoop.google.cloud.auth.service.account.enable" -> false.toString,
// "spark.sql.catalog.spark_catalog.catalog-impl" -> classOf[BQMSCatalog].getName,
// "spark.sql.catalog.spark_catalog" -> classOf[DelegatingBigQueryMetastoreCatalog].getName,
// "spark.sql.catalog.spark_catalog.io-impl" -> classOf[ResolvingFileIO].getName,
// "spark.sql.defaultUrlStreamHandlerFactory.enabled" -> false.toString,
// "spark.sql.catalog.spark_catalog.warehouse" -> "gs://zipline-warehouse-canary/data/tables/",
// "spark.sql.catalog.spark_catalog.gcp_location" -> "us-central1",
// "spark.sql.catalog.spark_catalog.gcp_project" -> "canary-443022",
// "spark.sql.catalogImplementation" -> "in-memory",
// "spark.kryo.registrator" -> classOf[ChrononIcebergKryoRegistrator].getName,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Commented config needs environment variables.

Uncomment for local testing and set required environment variables.


🏁 Script executed:

#!/bin/bash
# Check for testing environment variables
echo "Required environment variables for local testing:"
echo "WAREHOUSE_PATH, GCP_LOCATION, GCP_PROJECT"
env | grep -E "WAREHOUSE_PATH|GCP_LOCATION|GCP_PROJECT" || echo "No environment variables set"

Length of output: 326


Action: Configure Local Testing Environment

  • The config lines remain commented out.
  • Local tests require the environment variables: WAREHOUSE_PATH, GCP_LOCATION, and GCP_PROJECT.
  • Please uncomment the config in cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala (lines 40–49) and set these variables.

@tchow-zlai tchow-zlai merged commit 0ea24ad into main Mar 5, 2025
20 checks passed
@tchow-zlai tchow-zlai deleted the tchow/catalog branch March 5, 2025 18:38
tchow-zlai added a commit that referenced this pull request Mar 8, 2025
## Summary

- Modifying TableUtils to use spark's built-in catalog wherever
applicable. This will both improve performance and allow us to access
iceberg tables through the bigquery catalog, since its built on
#393.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced external checkout queries now deliver additional purchase
details for richer data insights.

- **Refactor**
- Updated cloud integrations to enforce supported table formats and
provide clearer error messaging.
- Streamlined table creation and partitioning processes in Spark for
improved performance and maintainability.
- Standardized collection types for partitioning and sorting parameters
in Dataframe operations.

- **Tests**
- Refined test cases to validate dynamic provider handling and verify
consistent table operations.
- Removed outdated test case for field name retrieval, reflecting
changes in validation focus.
  - Updated assertions to utilize the new format provider access method.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
@coderabbitai coderabbitai bot mentioned this pull request Mar 20, 2025
4 tasks
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
## Summary

Large PR here. This is the first part to supporting iceberg using
bigquery as a metastore. There are a few components to this:

1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of
this. This abstraction wraps the iceberg bigquery catalog that we've
introduced through a local jar download. The reason for wrapping it
instead of simply using it is so that we can allow it to handle
non-iceberg tables in both sql and non-sql spark contexts. This is
useful for reading Etsy's beacon datasets which are simply parquet
external tables, as well as their CDC streams that are bigquery native
tables.
2. `GCSFileOSerializer` is a simple wrapper that uses regular java
serialization instead of kryo to handle the GCSFileIO, since Kryo
doesn't handle closure serialization very well. I had added a few
classes into the kryo registrator for serializing closures but it still
doesn't seem to work in an actual spark job. I ultimately had to fall
back to regular java serialization, but since this is for just one class
that's not on the hotpath it should be fine.
3. Some serialization unit tests.
4. Lots of jar wrangling to get things to work in the right way. We'll
have to make sure this doesn't break the streaming / fetching side of
things as well.

Things to note are:
1. Had to submit a patch to the spark bigquery connector code
GoogleCloudDataproc/spark-bigquery-connector#1340
because the connector does not support three-part namespacing. As such,
you can only query tables that belong to the same project within a
single sql query until the above patch is in.
2. You can only write icberg tables to the currently configured project.
The project is configured using `additional-confs.yaml` and I used the
following config set to test this behavior:

```yaml
spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider"
spark.chronon.partition.format: "yyyy-MM-dd"
spark.chronon.table.gcs.temporary_gcs_bucket: "zipline-warehouse-canary"
spark.chronon.partition.column: "ds"
spark.chronon.table.gcs.connector_output_dataset: "data"
spark.chronon.table.gcs.connector_output_project: "canary-443022"
spark.chronon.coalesce.factor: "10"
spark.default.parallelism: "10"
spark.sql.shuffle.partitions: "10"
spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.gcp_location: "us-central1"
spark.sql.catalog.spark_catalog.gcp_project: "canary-443022"
spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO
spark.sql.defaultUrlStreamHandlerFactory.enabled: "false"
spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator"
```

3. I had to remove
https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121
from the cluster because it conflicts with the metastore and connector
jar being brought in here as dependencies. We'll need to rebuild our
clusters (and the ones on Etsy) _without_ the jar cc @chewy-zlai
4. Also made a change to the canary branch
zipline-ai/canary-confs@65fac34
to remove the project ID. This is not supported using the catalogs we
have. The configured project
`spark.sql.catalog.spark_catalog.gcp_project` is taken into account
across the board.

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced enhanced catalog management for seamless integration
between BigQuery and Iceberg.
- Added custom serialization logic to improve stability and performance.

- **Refactor / Dependency Updates**
- Streamlined dependency management and updated various library versions
for smoother runtime behavior.
- Refined Spark session configuration to ensure consistent application
of settings.
  - Added new dependencies related to Hadoop client API.

- **Tests**
- Expanded integration tests for BigQuery functionality and Kryo
serialization.
  - Removed obsolete test cases to focus on relevant validation.

- **CI/CD**
- Updated workflow triggers to activate on push events for improved
integration responsiveness.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
## Summary

- Modifying TableUtils to use spark's built-in catalog wherever
applicable. This will both improve performance and allow us to access
iceberg tables through the bigquery catalog, since its built on
#393.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced external checkout queries now deliver additional purchase
details for richer data insights.

- **Refactor**
- Updated cloud integrations to enforce supported table formats and
provide clearer error messaging.
- Streamlined table creation and partitioning processes in Spark for
improved performance and maintainability.
- Standardized collection types for partitioning and sorting parameters
in Dataframe operations.

- **Tests**
- Refined test cases to validate dynamic provider handling and verify
consistent table operations.
- Removed outdated test case for field name retrieval, reflecting
changes in validation focus.
  - Updated assertions to utilize the new format provider access method.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
## Summary

Large PR here. This is the first part to supporting iceberg using
bigquery as a metastore. There are a few components to this:

1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of
this. This abstraction wraps the iceberg bigquery catalog that we've
introduced through a local jar download. The reason for wrapping it
instead of simply using it is so that we can allow it to handle
non-iceberg tables in both sql and non-sql spark contexts. This is
useful for reading Etsy's beacon datasets which are simply parquet
external tables, as well as their CDC streams that are bigquery native
tables.
2. `GCSFileOSerializer` is a simple wrapper that uses regular java
serialization instead of kryo to handle the GCSFileIO, since Kryo
doesn't handle closure serialization very well. I had added a few
classes into the kryo registrator for serializing closures but it still
doesn't seem to work in an actual spark job. I ultimately had to fall
back to regular java serialization, but since this is for just one class
that's not on the hotpath it should be fine.
3. Some serialization unit tests.
4. Lots of jar wrangling to get things to work in the right way. We'll
have to make sure this doesn't break the streaming / fetching side of
things as well.

Things to note are:
1. Had to submit a patch to the spark bigquery connector code
GoogleCloudDataproc/spark-bigquery-connector#1340
because the connector does not support three-part namespacing. As such,
you can only query tables that belong to the same project within a
single sql query until the above patch is in.
2. You can only write icberg tables to the currently configured project.
The project is configured using `additional-confs.yaml` and I used the
following config set to test this behavior:

```yaml
spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider"
spark.chronon.partition.format: "yyyy-MM-dd"
spark.chronon.table.gcs.temporary_gcs_bucket: "zipline-warehouse-canary"
spark.chronon.partition.column: "ds"
spark.chronon.table.gcs.connector_output_dataset: "data"
spark.chronon.table.gcs.connector_output_project: "canary-443022"
spark.chronon.coalesce.factor: "10"
spark.default.parallelism: "10"
spark.sql.shuffle.partitions: "10"
spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.gcp_location: "us-central1"
spark.sql.catalog.spark_catalog.gcp_project: "canary-443022"
spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO
spark.sql.defaultUrlStreamHandlerFactory.enabled: "false"
spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator"
```

3. I had to remove
https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121
from the cluster because it conflicts with the metastore and connector
jar being brought in here as dependencies. We'll need to rebuild our
clusters (and the ones on Etsy) _without_ the jar cc @chewy-zlai
4. Also made a change to the canary branch
zipline-ai/canary-confs@65fac34
to remove the project ID. This is not supported using the catalogs we
have. The configured project
`spark.sql.catalog.spark_catalog.gcp_project` is taken into account
across the board.

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced enhanced catalog management for seamless integration
between BigQuery and Iceberg.
- Added custom serialization logic to improve stability and performance.

- **Refactor / Dependency Updates**
- Streamlined dependency management and updated various library versions
for smoother runtime behavior.
- Refined Spark session configuration to ensure consistent application
of settings.
  - Added new dependencies related to Hadoop client API.

- **Tests**
- Expanded integration tests for BigQuery functionality and Kryo
serialization.
  - Removed obsolete test cases to focus on relevant validation.

- **CI/CD**
- Updated workflow triggers to activate on push events for improved
integration responsiveness.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
## Summary

- Modifying TableUtils to use spark's built-in catalog wherever
applicable. This will both improve performance and allow us to access
iceberg tables through the bigquery catalog, since its built on
#393.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced external checkout queries now deliver additional purchase
details for richer data insights.

- **Refactor**
- Updated cloud integrations to enforce supported table formats and
provide clearer error messaging.
- Streamlined table creation and partitioning processes in Spark for
improved performance and maintainability.
- Standardized collection types for partitioning and sorting parameters
in Dataframe operations.

- **Tests**
- Refined test cases to validate dynamic provider handling and verify
consistent table operations.
- Removed outdated test case for field name retrieval, reflecting
changes in validation focus.
  - Updated assertions to utilize the new format provider access method.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

Large PR here. This is the first part to supporting iceberg using
bigquery as a metastore. There are a few components to this:

1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of
this. This abstraction wraps the iceberg bigquery catalog that we've
introduced through a local jar download. The reason for wrapping it
instead of simply using it is so that we can allow it to handle
non-iceberg tables in both sql and non-sql spark contexts. This is
useful for reading our clients's beacon datasets which are simply parquet
external tables, as well as their CDC streams that are bigquery native
tables.
2. `GCSFileOSerializer` is a simple wrapper that uses regular java
serialization instead of kryo to handle the GCSFileIO, since Kryo
doesn't handle closure serialization very well. I had added a few
classes into the kryo registrator for serializing closures but it still
doesn't seem to work in an actual spark job. I ultimately had to fall
back to regular java serialization, but since this is for just one class
that's not on the hotpath it should be fine.
3. Some serialization unit tests.
4. Lots of jar wrangling to get things to work in the right way. We'll
have to make sure this doesn't break the streaming / fetching side of
things as well.

Things to note are:
1. Had to submit a patch to the spark bigquery connector code
GoogleCloudDataproc/spark-bigquery-connector#1340
because the connector does not support three-part namespacing. As such,
you can only query tables that belong to the same project within a
single sql query until the above patch is in.
2. You can only write icberg tables to the currently configured project.
The project is configured using `additional-confs.yaml` and I used the
following config set to test this behavior:

```yaml
spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider"
spark.chronon.partition.format: "yyyy-MM-dd"
spark.chronon.table.gcs.temporary_gcs_bucket: "zipline-warehouse-canary"
spark.chronon.partition.column: "ds"
spark.chronon.table.gcs.connector_output_dataset: "data"
spark.chronon.table.gcs.connector_output_project: "canary-443022"
spark.chronon.coalesce.factor: "10"
spark.default.parallelism: "10"
spark.sql.shuffle.partitions: "10"
spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.gcp_location: "us-central1"
spark.sql.catalog.spark_catalog.gcp_project: "canary-443022"
spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO
spark.sql.defaultUrlStreamHandlerFactory.enabled: "false"
spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator"
```

3. I had to remove
https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121
from the cluster because it conflicts with the metastore and connector
jar being brought in here as dependencies. We'll need to rebuild our
clusters (and the ones on our clients) _without_ the jar cc @chewy-zlai
4. Also made a change to the canary branch
zipline-ai/canary-confs@65fac34
to remove the project ID. This is not supported using the catalogs we
have. The configured project
`spark.sql.catalog.spark_catalog.gcp_project` is taken into account
across the board.

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced enhanced catalog management for seamless integration
between BigQuery and Iceberg.
- Added custom serialization logic to improve stability and performance.

- **Refactor / Dependency Updates**
- Streamlined dependency management and updated various library versions
for smoother runtime behavior.
- Refined Spark session configuration to ensure consistent application
of settings.
  - Added new dependencies related to Hadoop client API.

- **Tests**
- Expanded integration tests for BigQuery functionality and Kryo
serialization.
  - Removed obsolete test cases to focus on relevant validation.

- **CI/CD**
- Updated workflow triggers to activate on push events for improved
integration responsiveness.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

- Modifying TableUtils to use spark's built-in catalog wherever
applicable. This will both improve performance and allow us to access
iceberg tables through the bigquery catalog, since its built on
#393.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced external checkout queries now deliver additional purchase
details for richer data insights.

- **Refactor**
- Updated cloud integrations to enforce supported table formats and
provide clearer error messaging.
- Streamlined table creation and partitioning processes in Spark for
improved performance and maintainability.
- Standardized collection types for partitioning and sorting parameters
in Dataframe operations.

- **Tests**
- Refined test cases to validate dynamic provider handling and verify
consistent table operations.
- Removed outdated test case for field name retrieval, reflecting
changes in validation focus.
  - Updated assertions to utilize the new format provider access method.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

Large PR here. This is the first part to supporting iceberg using
bigquery as a metastore. There are a few components to this:

1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of
this. This abstraction wraps the iceberg bigquery catalog that we've
introduced through a local jar download. The reason for wrapping it
instead of simply using it is so that we can allow it to handle
non-iceberg tables in both sql and non-sql spark contexts. This is
useful for reading our clients's beacon datasets which are simply parquet
external tables, as well as their CDC streams that are bigquery native
tables.
2. `GCSFileOSerializer` is a simple wrapper that uses regular java
serialization instead of kryo to handle the GCSFileIO, since Kryo
doesn't handle closure serialization very well. I had added a few
classes into the kryo registrator for serializing closures but it still
doesn't seem to work in an actual spark job. I ultimately had to fall
back to regular java serialization, but since this is for just one class
that's not on the hotpath it should be fine.
3. Some serialization unit tests.
4. Lots of jar wrangling to get things to work in the right way. We'll
have to make sure this doesn't break the streaming / fetching side of
things as well.

Things to note are:
1. Had to submit a patch to the spark bigquery connector code
GoogleCloudDataproc/spark-bigquery-connector#1340
because the connector does not support three-part namespacing. As such,
you can only query tables that belong to the same project within a
single sql query until the above patch is in.
2. You can only write icberg tables to the currently configured project.
The project is configured using `additional-confs.yaml` and I used the
following config set to test this behavior:

```yaml
spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider"
spark.chronon.partition.format: "yyyy-MM-dd"
spark.chronon.table.gcs.temporary_gcs_bucket: "zipline-warehouse-canary"
spark.chronon.partition.column: "ds"
spark.chronon.table.gcs.connector_output_dataset: "data"
spark.chronon.table.gcs.connector_output_project: "canary-443022"
spark.chronon.coalesce.factor: "10"
spark.default.parallelism: "10"
spark.sql.shuffle.partitions: "10"
spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.gcp_location: "us-central1"
spark.sql.catalog.spark_catalog.gcp_project: "canary-443022"
spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO
spark.sql.defaultUrlStreamHandlerFactory.enabled: "false"
spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator"
```

3. I had to remove
https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121
from the cluster because it conflicts with the metastore and connector
jar being brought in here as dependencies. We'll need to rebuild our
clusters (and the ones on our clients) _without_ the jar cc @chewy-zlai
4. Also made a change to the canary branch
zipline-ai/canary-confs@65fac34
to remove the project ID. This is not supported using the catalogs we
have. The configured project
`spark.sql.catalog.spark_catalog.gcp_project` is taken into account
across the board.

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced enhanced catalog management for seamless integration
between BigQuery and Iceberg.
- Added custom serialization logic to improve stability and performance.

- **Refactor / Dependency Updates**
- Streamlined dependency management and updated various library versions
for smoother runtime behavior.
- Refined Spark session configuration to ensure consistent application
of settings.
  - Added new dependencies related to Hadoop client API.

- **Tests**
- Expanded integration tests for BigQuery functionality and Kryo
serialization.
  - Removed obsolete test cases to focus on relevant validation.

- **CI/CD**
- Updated workflow triggers to activate on push events for improved
integration responsiveness.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

- Modifying TableUtils to use spark's built-in catalog wherever
applicable. This will both improve performance and allow us to access
iceberg tables through the bigquery catalog, since its built on
#393.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced external checkout queries now deliver additional purchase
details for richer data insights.

- **Refactor**
- Updated cloud integrations to enforce supported table formats and
provide clearer error messaging.
- Streamlined table creation and partitioning processes in Spark for
improved performance and maintainability.
- Standardized collection types for partitioning and sorting parameters
in Dataframe operations.

- **Tests**
- Refined test cases to validate dynamic provider handling and verify
consistent table operations.
- Removed outdated test case for field name retrieval, reflecting
changes in validation focus.
  - Updated assertions to utilize the new format provider access method.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary

Large PR here. This is the first part to supporting iceberg using
bigquery as a metastore. There are a few components to this:

1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of
this. This abstraction wraps the iceberg bigquery catalog that we've
introduced through a local jar download. The reason for wrapping it
instead of simply using it is so that we can allow it to handle
non-iceberg tables in both sql and non-sql spark contexts. This is
useful for reading our clients's beacon datasets which are simply parquet
external tables, as well as their CDC streams that are bigquery native
tables.
2. `GCSFileOSerializer` is a simple wrapper that uses regular java
serialization instead of kryo to handle the GCSFileIO, since Kryo
doesn't handle closure serialization very well. I had added a few
classes into the kryo registrator for serializing closures but it still
doesn't seem to work in an actual spark job. I ultimately had to fall
baour clients to regular java serialization, but since this is for just one class
that's not on the hotpath it should be fine.
3. Some serialization unit tests.
4. Lots of jar wrangling to get things to work in the right way. We'll
have to make sure this doesn't break the streaming / fetching side of
things as well.

Things to note are:
1. Had to submit a patch to the spark bigquery connector code
GoogleCloudDataproc/spark-bigquery-connector#1340
because the connector does not support three-part namespacing. As such,
you can only query tables that belong to the same project within a
single sql query until the above patch is in.
2. You can only write icberg tables to the currently configured project.
The project is configured using `additional-confs.yaml` and I used the
following config set to test this behavior:

```yaml
spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider"
spark.chronon.partition.format: "yyyy-MM-dd"
spark.chronon.table.gcs.temporary_gcs_buour clientset: "zipline-warehouse-canary"
spark.chronon.partition.column: "ds"
spark.chronon.table.gcs.connector_output_dataset: "data"
spark.chronon.table.gcs.connector_output_project: "canary-443022"
spark.chronon.coalesce.factor: "10"
spark.default.parallelism: "10"
spark.sql.shuffle.partitions: "10"
spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.gcp_location: "us-central1"
spark.sql.catalog.spark_catalog.gcp_project: "canary-443022"
spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO
spark.sql.defaultUrlStreamHandlerFactory.enabled: "false"
spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator"
```

3. I had to remove
https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121
from the cluster because it conflicts with the metastore and connector
jar being brought in here as dependencies. We'll need to rebuild our
clusters (and the ones on our clients) _without_ the jar cc @chewy-zlai
4. Also made a change to the canary branch
zipline-ai/canary-confs@65fac34
to remove the project ID. This is not supported using the catalogs we
have. The configured project
`spark.sql.catalog.spark_catalog.gcp_project` is taken into account
across the board.

## Cheour clientslist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced enhanced catalog management for seamless integration
between BigQuery and Iceberg.
- Added custom serialization logic to improve stability and performance.

- **Refactor / Dependency Updates**
- Streamlined dependency management and updated various library versions
for smoother runtime behavior.
- Refined Spark session configuration to ensure consistent application
of settings.
  - Added new dependencies related to Hadoop client API.

- **Tests**
- Expanded integration tests for BigQuery functionality and Kryo
serialization.
  - Removed obsolete test cases to focus on relevant validation.

- **CI/CD**
- Updated workflow triggers to activate on push events for improved
integration responsiveness.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to traour clients
the status of staour clientss when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary

- Modifying TableUtils to use spark's built-in catalog wherever
applicable. This will both improve performance and allow us to access
iceberg tables through the bigquery catalog, since its built on
#393.

## Cheour clientslist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced external cheour clientsout queries now deliver additional purchase
details for richer data insights.

- **Refactor**
- Updated cloud integrations to enforce supported table formats and
provide clearer error messaging.
- Streamlined table creation and partitioning processes in Spark for
improved performance and maintainability.
- Standardized collection types for partitioning and sorting parameters
in Dataframe operations.

- **Tests**
- Refined test cases to validate dynamic provider handling and verify
consistent table operations.
- Removed outdated test case for field name retrieval, reflecting
changes in validation focus.
  - Updated assertions to utilize the new format provider access method.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to traour clients
the status of staour clientss when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants