Skip to content

fix: respect project id for incoming identifier #1340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

thomaschow
Copy link

  • Need to respect the incoming project id as well.

Copy link

google-cla bot commented Feb 23, 2025

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

@@ -130,7 +131,9 @@ public Table loadTable(Identifier identifier) throws NoSuchTableException {
Spark35BigQueryTable::new,
null,
ImmutableMap.of(
"dataset", identifier.namespace()[0], "table", identifier.name())));
"project", tableId.getProject(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid it's not that trivial - project may by null in the TableId. Also, we need to add tests for cross project reading.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an existing test suite for this class? I understand it's fairly new so no worries if not. I'm happy to add some if this change looks directionally sound. Please advise!

tchow-zlai added a commit to zipline-ai/chronon that referenced this pull request Mar 5, 2025
## Summary

Large PR here. This is the first part to supporting iceberg using
bigquery as a metastore. There are a few components to this:

1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of
this. This abstraction wraps the iceberg bigquery catalog that we've
introduced through a local jar download. The reason for wrapping it
instead of simply using it is so that we can allow it to handle
non-iceberg tables in both sql and non-sql spark contexts. This is
useful for reading Etsy's beacon datasets which are simply parquet
external tables, as well as their CDC streams that are bigquery native
tables.
2. `GCSFileOSerializer` is a simple wrapper that uses regular java
serialization instead of kryo to handle the GCSFileIO, since Kryo
doesn't handle closure serialization very well. I had added a few
classes into the kryo registrator for serializing closures but it still
doesn't seem to work in an actual spark job. I ultimately had to fall
back to regular java serialization, but since this is for just one class
that's not on the hotpath it should be fine.
3. Some serialization unit tests.
4. Lots of jar wrangling to get things to work in the right way. We'll
have to make sure this doesn't break the streaming / fetching side of
things as well.

Things to note are:
1. Had to submit a patch to the spark bigquery connector code
GoogleCloudDataproc/spark-bigquery-connector#1340
because the connector does not support three-part namespacing. As such,
you can only query tables that belong to the same project within a
single sql query until the above patch is in.
2. You can only write icberg tables to the currently configured project.
The project is configured using `additional-confs.yaml` and I used the
following config set to test this behavior:

```yaml
spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider"
spark.chronon.partition.format: "yyyy-MM-dd"
spark.chronon.table.gcs.temporary_gcs_bucket: "zipline-warehouse-canary"
spark.chronon.partition.column: "ds"
spark.chronon.table.gcs.connector_output_dataset: "data"
spark.chronon.table.gcs.connector_output_project: "canary-443022"
spark.chronon.coalesce.factor: "10"
spark.default.parallelism: "10"
spark.sql.shuffle.partitions: "10"
spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.gcp_location: "us-central1"
spark.sql.catalog.spark_catalog.gcp_project: "canary-443022"
spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO
spark.sql.defaultUrlStreamHandlerFactory.enabled: "false"
spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator"
```

3. I had to remove
https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121
from the cluster because it conflicts with the metastore and connector
jar being brought in here as dependencies. We'll need to rebuild our
clusters (and the ones on Etsy) _without_ the jar cc @chewy-zlai
4. Also made a change to the canary branch
zipline-ai/canary-confs@65fac34
to remove the project ID. This is not supported using the catalogs we
have. The configured project
`spark.sql.catalog.spark_catalog.gcp_project` is taken into account
across the board.

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced enhanced catalog management for seamless integration
between BigQuery and Iceberg.
- Added custom serialization logic to improve stability and performance.

- **Refactor / Dependency Updates**
- Streamlined dependency management and updated various library versions
for smoother runtime behavior.
- Refined Spark session configuration to ensure consistent application
of settings.
  - Added new dependencies related to Hadoop client API.

- **Tests**
- Expanded integration tests for BigQuery functionality and Kryo
serialization.
  - Removed obsolete test cases to focus on relevant validation.

- **CI/CD**
- Updated workflow triggers to activate on push events for improved
integration responsiveness.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
kumar-zlai pushed a commit to zipline-ai/chronon that referenced this pull request Apr 25, 2025
## Summary

Large PR here. This is the first part to supporting iceberg using
bigquery as a metastore. There are a few components to this:

1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of
this. This abstraction wraps the iceberg bigquery catalog that we've
introduced through a local jar download. The reason for wrapping it
instead of simply using it is so that we can allow it to handle
non-iceberg tables in both sql and non-sql spark contexts. This is
useful for reading Etsy's beacon datasets which are simply parquet
external tables, as well as their CDC streams that are bigquery native
tables.
2. `GCSFileOSerializer` is a simple wrapper that uses regular java
serialization instead of kryo to handle the GCSFileIO, since Kryo
doesn't handle closure serialization very well. I had added a few
classes into the kryo registrator for serializing closures but it still
doesn't seem to work in an actual spark job. I ultimately had to fall
back to regular java serialization, but since this is for just one class
that's not on the hotpath it should be fine.
3. Some serialization unit tests.
4. Lots of jar wrangling to get things to work in the right way. We'll
have to make sure this doesn't break the streaming / fetching side of
things as well.

Things to note are:
1. Had to submit a patch to the spark bigquery connector code
GoogleCloudDataproc/spark-bigquery-connector#1340
because the connector does not support three-part namespacing. As such,
you can only query tables that belong to the same project within a
single sql query until the above patch is in.
2. You can only write icberg tables to the currently configured project.
The project is configured using `additional-confs.yaml` and I used the
following config set to test this behavior:

```yaml
spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider"
spark.chronon.partition.format: "yyyy-MM-dd"
spark.chronon.table.gcs.temporary_gcs_bucket: "zipline-warehouse-canary"
spark.chronon.partition.column: "ds"
spark.chronon.table.gcs.connector_output_dataset: "data"
spark.chronon.table.gcs.connector_output_project: "canary-443022"
spark.chronon.coalesce.factor: "10"
spark.default.parallelism: "10"
spark.sql.shuffle.partitions: "10"
spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.gcp_location: "us-central1"
spark.sql.catalog.spark_catalog.gcp_project: "canary-443022"
spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO
spark.sql.defaultUrlStreamHandlerFactory.enabled: "false"
spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator"
```

3. I had to remove
https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121
from the cluster because it conflicts with the metastore and connector
jar being brought in here as dependencies. We'll need to rebuild our
clusters (and the ones on Etsy) _without_ the jar cc @chewy-zlai
4. Also made a change to the canary branch
zipline-ai/canary-confs@65fac34
to remove the project ID. This is not supported using the catalogs we
have. The configured project
`spark.sql.catalog.spark_catalog.gcp_project` is taken into account
across the board.

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced enhanced catalog management for seamless integration
between BigQuery and Iceberg.
- Added custom serialization logic to improve stability and performance.

- **Refactor / Dependency Updates**
- Streamlined dependency management and updated various library versions
for smoother runtime behavior.
- Refined Spark session configuration to ensure consistent application
of settings.
  - Added new dependencies related to Hadoop client API.

- **Tests**
- Expanded integration tests for BigQuery functionality and Kryo
serialization.
  - Removed obsolete test cases to focus on relevant validation.

- **CI/CD**
- Updated workflow triggers to activate on push events for improved
integration responsiveness.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
kumar-zlai pushed a commit to zipline-ai/chronon that referenced this pull request Apr 29, 2025
## Summary

Large PR here. This is the first part to supporting iceberg using
bigquery as a metastore. There are a few components to this:

1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of
this. This abstraction wraps the iceberg bigquery catalog that we've
introduced through a local jar download. The reason for wrapping it
instead of simply using it is so that we can allow it to handle
non-iceberg tables in both sql and non-sql spark contexts. This is
useful for reading Etsy's beacon datasets which are simply parquet
external tables, as well as their CDC streams that are bigquery native
tables.
2. `GCSFileOSerializer` is a simple wrapper that uses regular java
serialization instead of kryo to handle the GCSFileIO, since Kryo
doesn't handle closure serialization very well. I had added a few
classes into the kryo registrator for serializing closures but it still
doesn't seem to work in an actual spark job. I ultimately had to fall
back to regular java serialization, but since this is for just one class
that's not on the hotpath it should be fine.
3. Some serialization unit tests.
4. Lots of jar wrangling to get things to work in the right way. We'll
have to make sure this doesn't break the streaming / fetching side of
things as well.

Things to note are:
1. Had to submit a patch to the spark bigquery connector code
GoogleCloudDataproc/spark-bigquery-connector#1340
because the connector does not support three-part namespacing. As such,
you can only query tables that belong to the same project within a
single sql query until the above patch is in.
2. You can only write icberg tables to the currently configured project.
The project is configured using `additional-confs.yaml` and I used the
following config set to test this behavior:

```yaml
spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider"
spark.chronon.partition.format: "yyyy-MM-dd"
spark.chronon.table.gcs.temporary_gcs_bucket: "zipline-warehouse-canary"
spark.chronon.partition.column: "ds"
spark.chronon.table.gcs.connector_output_dataset: "data"
spark.chronon.table.gcs.connector_output_project: "canary-443022"
spark.chronon.coalesce.factor: "10"
spark.default.parallelism: "10"
spark.sql.shuffle.partitions: "10"
spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.gcp_location: "us-central1"
spark.sql.catalog.spark_catalog.gcp_project: "canary-443022"
spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO
spark.sql.defaultUrlStreamHandlerFactory.enabled: "false"
spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator"
```

3. I had to remove
https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121
from the cluster because it conflicts with the metastore and connector
jar being brought in here as dependencies. We'll need to rebuild our
clusters (and the ones on Etsy) _without_ the jar cc @chewy-zlai
4. Also made a change to the canary branch
zipline-ai/canary-confs@65fac34
to remove the project ID. This is not supported using the catalogs we
have. The configured project
`spark.sql.catalog.spark_catalog.gcp_project` is taken into account
across the board.

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced enhanced catalog management for seamless integration
between BigQuery and Iceberg.
- Added custom serialization logic to improve stability and performance.

- **Refactor / Dependency Updates**
- Streamlined dependency management and updated various library versions
for smoother runtime behavior.
- Refined Spark session configuration to ensure consistent application
of settings.
  - Added new dependencies related to Hadoop client API.

- **Tests**
- Expanded integration tests for BigQuery functionality and Kryo
serialization.
  - Removed obsolete test cases to focus on relevant validation.

- **CI/CD**
- Updated workflow triggers to activate on push events for improved
integration responsiveness.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit to zipline-ai/chronon that referenced this pull request May 15, 2025
## Summary

Large PR here. This is the first part to supporting iceberg using
bigquery as a metastore. There are a few components to this:

1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of
this. This abstraction wraps the iceberg bigquery catalog that we've
introduced through a local jar download. The reason for wrapping it
instead of simply using it is so that we can allow it to handle
non-iceberg tables in both sql and non-sql spark contexts. This is
useful for reading our clients's beacon datasets which are simply parquet
external tables, as well as their CDC streams that are bigquery native
tables.
2. `GCSFileOSerializer` is a simple wrapper that uses regular java
serialization instead of kryo to handle the GCSFileIO, since Kryo
doesn't handle closure serialization very well. I had added a few
classes into the kryo registrator for serializing closures but it still
doesn't seem to work in an actual spark job. I ultimately had to fall
back to regular java serialization, but since this is for just one class
that's not on the hotpath it should be fine.
3. Some serialization unit tests.
4. Lots of jar wrangling to get things to work in the right way. We'll
have to make sure this doesn't break the streaming / fetching side of
things as well.

Things to note are:
1. Had to submit a patch to the spark bigquery connector code
GoogleCloudDataproc/spark-bigquery-connector#1340
because the connector does not support three-part namespacing. As such,
you can only query tables that belong to the same project within a
single sql query until the above patch is in.
2. You can only write icberg tables to the currently configured project.
The project is configured using `additional-confs.yaml` and I used the
following config set to test this behavior:

```yaml
spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider"
spark.chronon.partition.format: "yyyy-MM-dd"
spark.chronon.table.gcs.temporary_gcs_bucket: "zipline-warehouse-canary"
spark.chronon.partition.column: "ds"
spark.chronon.table.gcs.connector_output_dataset: "data"
spark.chronon.table.gcs.connector_output_project: "canary-443022"
spark.chronon.coalesce.factor: "10"
spark.default.parallelism: "10"
spark.sql.shuffle.partitions: "10"
spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.gcp_location: "us-central1"
spark.sql.catalog.spark_catalog.gcp_project: "canary-443022"
spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO
spark.sql.defaultUrlStreamHandlerFactory.enabled: "false"
spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator"
```

3. I had to remove
https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121
from the cluster because it conflicts with the metastore and connector
jar being brought in here as dependencies. We'll need to rebuild our
clusters (and the ones on our clients) _without_ the jar cc @chewy-zlai
4. Also made a change to the canary branch
zipline-ai/canary-confs@65fac34
to remove the project ID. This is not supported using the catalogs we
have. The configured project
`spark.sql.catalog.spark_catalog.gcp_project` is taken into account
across the board.

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced enhanced catalog management for seamless integration
between BigQuery and Iceberg.
- Added custom serialization logic to improve stability and performance.

- **Refactor / Dependency Updates**
- Streamlined dependency management and updated various library versions
for smoother runtime behavior.
- Refined Spark session configuration to ensure consistent application
of settings.
  - Added new dependencies related to Hadoop client API.

- **Tests**
- Expanded integration tests for BigQuery functionality and Kryo
serialization.
  - Removed obsolete test cases to focus on relevant validation.

- **CI/CD**
- Updated workflow triggers to activate on push events for improved
integration responsiveness.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit to zipline-ai/chronon that referenced this pull request May 15, 2025
## Summary

Large PR here. This is the first part to supporting iceberg using
bigquery as a metastore. There are a few components to this:

1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of
this. This abstraction wraps the iceberg bigquery catalog that we've
introduced through a local jar download. The reason for wrapping it
instead of simply using it is so that we can allow it to handle
non-iceberg tables in both sql and non-sql spark contexts. This is
useful for reading our clients's beacon datasets which are simply parquet
external tables, as well as their CDC streams that are bigquery native
tables.
2. `GCSFileOSerializer` is a simple wrapper that uses regular java
serialization instead of kryo to handle the GCSFileIO, since Kryo
doesn't handle closure serialization very well. I had added a few
classes into the kryo registrator for serializing closures but it still
doesn't seem to work in an actual spark job. I ultimately had to fall
back to regular java serialization, but since this is for just one class
that's not on the hotpath it should be fine.
3. Some serialization unit tests.
4. Lots of jar wrangling to get things to work in the right way. We'll
have to make sure this doesn't break the streaming / fetching side of
things as well.

Things to note are:
1. Had to submit a patch to the spark bigquery connector code
GoogleCloudDataproc/spark-bigquery-connector#1340
because the connector does not support three-part namespacing. As such,
you can only query tables that belong to the same project within a
single sql query until the above patch is in.
2. You can only write icberg tables to the currently configured project.
The project is configured using `additional-confs.yaml` and I used the
following config set to test this behavior:

```yaml
spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider"
spark.chronon.partition.format: "yyyy-MM-dd"
spark.chronon.table.gcs.temporary_gcs_bucket: "zipline-warehouse-canary"
spark.chronon.partition.column: "ds"
spark.chronon.table.gcs.connector_output_dataset: "data"
spark.chronon.table.gcs.connector_output_project: "canary-443022"
spark.chronon.coalesce.factor: "10"
spark.default.parallelism: "10"
spark.sql.shuffle.partitions: "10"
spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.gcp_location: "us-central1"
spark.sql.catalog.spark_catalog.gcp_project: "canary-443022"
spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO
spark.sql.defaultUrlStreamHandlerFactory.enabled: "false"
spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator"
```

3. I had to remove
https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121
from the cluster because it conflicts with the metastore and connector
jar being brought in here as dependencies. We'll need to rebuild our
clusters (and the ones on our clients) _without_ the jar cc @chewy-zlai
4. Also made a change to the canary branch
zipline-ai/canary-confs@65fac34
to remove the project ID. This is not supported using the catalogs we
have. The configured project
`spark.sql.catalog.spark_catalog.gcp_project` is taken into account
across the board.

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced enhanced catalog management for seamless integration
between BigQuery and Iceberg.
- Added custom serialization logic to improve stability and performance.

- **Refactor / Dependency Updates**
- Streamlined dependency management and updated various library versions
for smoother runtime behavior.
- Refined Spark session configuration to ensure consistent application
of settings.
  - Added new dependencies related to Hadoop client API.

- **Tests**
- Expanded integration tests for BigQuery functionality and Kryo
serialization.
  - Removed obsolete test cases to focus on relevant validation.

- **CI/CD**
- Updated workflow triggers to activate on push events for improved
integration responsiveness.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit to zipline-ai/chronon that referenced this pull request May 16, 2025
## Summary

Large PR here. This is the first part to supporting iceberg using
bigquery as a metastore. There are a few components to this:

1. `DelegatingBigQueryMetastoreCatalog` is the main actor in all of
this. This abstraction wraps the iceberg bigquery catalog that we've
introduced through a local jar download. The reason for wrapping it
instead of simply using it is so that we can allow it to handle
non-iceberg tables in both sql and non-sql spark contexts. This is
useful for reading our clients's beacon datasets which are simply parquet
external tables, as well as their CDC streams that are bigquery native
tables.
2. `GCSFileOSerializer` is a simple wrapper that uses regular java
serialization instead of kryo to handle the GCSFileIO, since Kryo
doesn't handle closure serialization very well. I had added a few
classes into the kryo registrator for serializing closures but it still
doesn't seem to work in an actual spark job. I ultimately had to fall
baour clients to regular java serialization, but since this is for just one class
that's not on the hotpath it should be fine.
3. Some serialization unit tests.
4. Lots of jar wrangling to get things to work in the right way. We'll
have to make sure this doesn't break the streaming / fetching side of
things as well.

Things to note are:
1. Had to submit a patch to the spark bigquery connector code
GoogleCloudDataproc/spark-bigquery-connector#1340
because the connector does not support three-part namespacing. As such,
you can only query tables that belong to the same project within a
single sql query until the above patch is in.
2. You can only write icberg tables to the currently configured project.
The project is configured using `additional-confs.yaml` and I used the
following config set to test this behavior:

```yaml
spark.chronon.table.format_provider.class: "ai.chronon.integrations.cloud_gcp.GcpFormatProvider"
spark.chronon.partition.format: "yyyy-MM-dd"
spark.chronon.table.gcs.temporary_gcs_buour clientset: "zipline-warehouse-canary"
spark.chronon.partition.column: "ds"
spark.chronon.table.gcs.connector_output_dataset: "data"
spark.chronon.table.gcs.connector_output_project: "canary-443022"
spark.chronon.coalesce.factor: "10"
spark.default.parallelism: "10"
spark.sql.shuffle.partitions: "10"
spark.chronon.table_write.prefix: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.warehouse: "gs://zipline-warehouse-canary/data/tables/"
spark.sql.catalog.spark_catalog.gcp_location: "us-central1"
spark.sql.catalog.spark_catalog.gcp_project: "canary-443022"
spark.sql.catalog.spark_catalog.catalog-impl: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog: ai.chronon.integrations.cloud_gcp.DelegatingBigQueryMetastoreCatalog
spark.sql.catalog.spark_catalog.io-impl: org.apache.iceberg.io.ResolvingFileIO
spark.sql.defaultUrlStreamHandlerFactory.enabled: "false"
spark.kryo.registrator: "ai.chronon.integrations.cloud_gcp.GCPKryoRegistrator"
```

3. I had to remove
https://github.com/zipline-ai/infrastructure/blob/e30ae1470d4568e3ae2dab384c4d8971dac973c9/base-gcp/dataproc.tf#L121
from the cluster because it conflicts with the metastore and connector
jar being brought in here as dependencies. We'll need to rebuild our
clusters (and the ones on our clients) _without_ the jar cc @chewy-zlai
4. Also made a change to the canary branch
zipline-ai/canary-confs@65fac34
to remove the project ID. This is not supported using the catalogs we
have. The configured project
`spark.sql.catalog.spark_catalog.gcp_project` is taken into account
across the board.

## Cheour clientslist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [x] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced enhanced catalog management for seamless integration
between BigQuery and Iceberg.
- Added custom serialization logic to improve stability and performance.

- **Refactor / Dependency Updates**
- Streamlined dependency management and updated various library versions
for smoother runtime behavior.
- Refined Spark session configuration to ensure consistent application
of settings.
  - Added new dependencies related to Hadoop client API.

- **Tests**
- Expanded integration tests for BigQuery functionality and Kryo
serialization.
  - Removed obsolete test cases to focus on relevant validation.

- **CI/CD**
- Updated workflow triggers to activate on push events for improved
integration responsiveness.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to traour clients
the status of staour clientss when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants