Skip to content

🎉 New Destination: Apache Iceberg #18836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Nov 18, 2022
Merged

Conversation

Leibnizhu
Copy link
Contributor

@Leibnizhu Leibnizhu commented Nov 2, 2022

What

Add Apache Iceberg Destination.
close #2833. Related to #6745.

How

Add Apache Iceberg Destination, implements by Spark SQL(3.3.0) with iceberg-spark-runtime(1.0.0).
Spark runs on local mode.

Recommended reading order

  1. spec.json
  2. IcebergDestination.java
  3. IcebergConsumer.java

🚨 User Impact 🚨

No breaking changes, only new destination.

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • docs/integrations/README.md
    • airbyte-integrations/builds.md
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub by running the /publish command described here
  • After the connector is published, connector added to connector index as described here
  • Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here

Tests

Unit

image

see https://gist.github.com/Leibnizhu/c3eaad1b04ad8a8becb77cda28b47313

Integration

execute:

./gradlew :airbyte-integrations:connectors:destination-iceberg:integrationTestJava --tests "*IntegrationTest"

image

see https://gist.github.com/Leibnizhu/33977e02c191e8d0da68c3a1af2bce57

Acceptance

execute:

# write config in `secrets/config.json`
./gradlew :airbyte-integrations:connectors:destination-iceberg:integrationTestJava --tests "*AcceptanceTest"

see https://gist.github.com/Leibnizhu/c6c76cc9f42856795bb4895ee434849c

@CLAassistant
Copy link

CLAassistant commented Nov 2, 2022

CLA assistant check
All committers have signed the CLA.

@github-actions github-actions bot added area/connectors Connector related issues area/documentation Improvements or additions to documentation labels Nov 2, 2022
@Leibnizhu Leibnizhu changed the title Add Apache Iceberg Destination 🎉 New Destination: Apache Iceberg Nov 2, 2022
@Leibnizhu Leibnizhu marked this pull request as ready for review November 2, 2022 06:55
@Leibnizhu Leibnizhu marked this pull request as draft November 2, 2022 08:29
@Leibnizhu Leibnizhu marked this pull request as ready for review November 2, 2022 09:35
@marcosmarxm
Copy link
Member

thanks for the contribution @Leibnizhu someone from the team will review during the week.

@marcosmarxm marcosmarxm requested a review from itaseskii November 2, 2022 22:17
@sajarin sajarin added the bounty-XL Maintainer program: claimable extra large bounty PR label Nov 7, 2022
@marcosmarxm
Copy link
Member

@Leibnizhu do you need assistance or review on this work?

@Leibnizhu
Copy link
Contributor Author

@Leibnizhu do you need assistance or review on this work?

yep, pls give some advice

if (!namespaceSet.contains(namespace)) {
namespaceSet.add(namespace);
try {
spark.sql("CREATE DATABASE IF NOT EXISTS " + namespace);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it comes to sql databases most of the time the namespace is used to separate the data in logical schemas. is it really needed to create a new database per namespace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For iceberg, a table must belong to a database. iceberg's database is for physical isolation, and it's metadata is stored in catalog(e.g. HiveCatalog by hive metastore server).
In this PR, we assume that one namespace is mapping to one iceberg database.

writeConfig.isAppendMode() ? "append" : "overwrite",
tempTableName,
finalTableName);
spark.sql("SELECT * FROM %s".formatted(tempTableName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point me out to where the temp and full tables are created? is this done implicitly?

Copy link
Contributor Author

@Leibnizhu Leibnizhu Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they are created implicitly by spark sql ( saveAsTable )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imo temp tables should be created in the startTracked() method which is used for executing all the preparation steps before starting to receive records. It is better to fail fast if you can't create something than to find out midway while replicating data.

public AirbyteConnectionStatus check(JsonNode config) {
try {
IcebergCatalogConfig icebergCatalogConfig = icebergCatalogConfigFactory.fromJsonNodeConfig(config);
icebergCatalogConfig.check();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big fan of performing business logic in a config class. Imo config classes should only contain methods for initialization of objects. The cleanup logic is also error prone, one example is if you manage to create a table with

Table tempTable = catalog.createTable(tempTableId, schema);

but somehow fail on

btw why do you read records instead of writing?

 try (CloseableIterable<Record> records = IcebergGenerics.read(tempTable).build()) {
            for (Record record : records) {
                // never reach
                log.info("Record in temp table: {}", record);
            }
        }

your cleanup logic with

boolean dropSuccess = catalog.dropTable(tempTableId);

wont be executed and for that reason such logic should be part of a finally{} block

@itaseskii
Copy link
Contributor

itaseskii commented Nov 17, 2022

/test connector=connectors/destination-iceberg

🕑 connectors/destination-iceberg https://github.com/airbytehq/airbyte/actions/runs/3492896620
❌ connectors/destination-iceberg https://github.com/airbytehq/airbyte/actions/runs/3492896620
🐛 https://gradle.com/s/zhnfxj65phxoi

Build Failed

Test summary info:

Could not find result summary

@itaseskii
Copy link
Contributor

itaseskii commented Nov 17, 2022

Hey @Leibnizhu what type of setup/config do we need to get the tests in IcebergDestinationAcceptanceTest passing? I see that you have different acceptance tests for different configurations so I was wondering what is the point of this test class? :)

@Leibnizhu
Copy link
Contributor Author

thanks for the contribution @Leibnizhu someone from the team will review during the week.

Hey @Leibnizhu what type of setup/config do we need to get the tests in IcebergDestinationAcceptanceTest passing? I see that you have different acceptance tests for different configurations so I was wondering what is the point of this test class? :)

IcebergDestinationAcceptanceTest writes for acceptance tests with external Iceberg catalog, for local development. Examples for config.json are in secrets-examples dir.

IcebergDestinationAcceptanceTest needs an Iceberg Catalog service (by Hive metastore, or Jdbc database, or hdfs), and a storage service (S3 protocol, for example , Amazon S3, or Minio).

I have already write tests with testcontainers in packages: io.airbyte.integrations.destination.iceberg.hadoop, io.airbyte.integrations.destination.iceberg.hive, io.airbyte.integrations.destination.iceberg.jdbc, so, IcebergDestinationAcceptanceTest can be removed.

@itaseskii
Copy link
Contributor

thanks for the contribution @Leibnizhu someone from the team will review during the week.

Hey @Leibnizhu what type of setup/config do we need to get the tests in IcebergDestinationAcceptanceTest passing? I see that you have different acceptance tests for different configurations so I was wondering what is the point of this test class? :)

IcebergDestinationAcceptanceTest writes for acceptance tests with external Iceberg catalog, for local development. Examples for config.json are in secrets-examples dir.

IcebergDestinationAcceptanceTest needs an Iceberg Catalog service (by Hive metastore, or Jdbc database, or hdfs), and a storage service (S3 protocol, for example , Amazon S3, or Minio).

I have already write tests with testcontainers in packages: io.airbyte.integrations.destination.iceberg.hadoop, io.airbyte.integrations.destination.iceberg.hive, io.airbyte.integrations.destination.iceberg.jdbc, so, IcebergDestinationAcceptanceTest can be removed.

@Leibnizhu okay then please remove it so we can have a passing test run in the CI

@marcosmarxm
Copy link
Member

marcosmarxm commented Nov 18, 2022

/test connector=connectors/destination-iceberg

🕑 connectors/destination-iceberg https://github.com/airbytehq/airbyte/actions/runs/3498603268
✅ connectors/destination-iceberg https://github.com/airbytehq/airbyte/actions/runs/3498603268
No Python unittests run

Build Passed

Test summary info:

All Passed

@Data
public class FormatConfig {

public static final int DEFAULT_FLUSH_BATCH_SIZE = 10000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffering records based on number is a bit risky since 10_000 records depending on size can quickly exhaust your memory. It is better if actual storage size is taken into consideration as well.

@itaseskii
Copy link
Contributor

@marcosmarxm Overall this connector seems solid with some improvements points from my side. I think that we can proceed with merging.

Copy link
Member

@marcosmarxm marcosmarxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Leibnizhu

@Leibnizhu
Copy link
Contributor Author

@marcosmarxm Overall this connector seems solid with some improvements points from my side. I think that we can proceed with merging.

thanks for reviewing. I will keep on improving these points at next PR.

Leibnizhu added a commit to Leibnizhu/airbyte that referenced this pull request Nov 19, 2022
1. create temp table in startTracked()
2. extract iceberg spark sql operations to IcebergOperations, from IcebergConsumer and IcebergCatalogConfig
3. in check() method, create temp table, then write something and read it , and drop table finally.
@wahbiharibi
Copy link

Trying to test the connector but getting the following error. Any idea?

image

2023-02-06 16:36:22 ERROR i.a.w.i.DefaultAirbyteStreamFactory(internalLog):163 - Exception attempting to access the Iceberg catalog:
airbyte-worker | Stack Trace: org.apache.iceberg.exceptions.ValidationException: Invalid S3 URI, cannot determine scheme: file:/user/hive/warehouse/temp_1675701381294/metadata/00000-2b6c90f7-2314-4a38-bf3b-bc33ffbdb64b.metadata.json
airbyte-worker | at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
airbyte-worker | at org.apache.iceberg.aws.s3.S3URI.(S3URI.java:72)
airbyte-worker | at org.apache.iceberg.aws.s3.S3OutputFile.fromLocation(S3OutputFile.java:40)
airbyte-worker | at org.apache.iceberg.aws.s3.S3FileIO.newOutputFile(S3FileIO.java:132)
airbyte-worker | at org.apache.iceberg.BaseMetastoreTableOperations.writeNewMetadata(BaseMetastoreTableOperations.java:157)
airbyte-worker | at org.apache.iceberg.hive.HiveTableOperations.doCommit(HiveTableOperations.java:234)
airbyte-worker | at org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:133)
airbyte-worker | at org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.create(BaseMetastoreCatalog.java:174)
airbyte-worker | at org.apache.iceberg.catalog.Catalog.createTable(Catalog.java:75)
airbyte-worker | at org.apache.iceberg.catalog.Catalog.createTable(Catalog.java:118)
airbyte-worker | at io.airbyte.integrations.destination.iceberg.config.catalog.IcebergCatalogConfig.check(IcebergCatalogConfig.java:47)
airbyte-worker | at io.airbyte.integrations.destination.iceberg.IcebergDestination.check(IcebergDestination.java:49)
airbyte-worker | at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:125)
airbyte-worker | at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:100)
airbyte-worker | at io.airbyte.integrations.destination.iceberg.IcebergDestination.main(IcebergDestination.java:42)

@Leibnizhu
Copy link
Contributor Author

Leibnizhu commented Feb 6, 2023

Trying to test the connector but getting the following error. Any idea?

@wahbiharibi is your hive default database created as an Iceberg database?

normally, hive default database is created by hive, and it's location is local or hdfs; so default database cannot use as an Iceberg database with S3 storage.

you could try another nonexistent database name, and Iceberg connector will create it with s3 location for you.

For example, default database and an Iceberg database shows in hive metastore server's DBS table:

image

@wahbiharibi
Copy link

Trying to test the connector but getting the following error. Any idea?

@wahbiharibi is your hive default database created as an Iceberg database?

normally, hive default database is created by hive, and it's location is local or hdfs; so default database cannot use as an Iceberg database with S3 storage.

you could try another nonexistent database name, and Iceberg connector will create it with s3 location for you.

For example, default database and an Iceberg database shows in hive metastore server's DBS table:

Thanks @Leibnizhu. It worked :)

@natalyjazzviolin
Copy link
Contributor

Currently blocked by https://github.com/airbytehq/airbyte-internal-issues/issues/2695 , waiting on sandbox account.

@OmarSultan85
Copy link

OmarSultan85 commented May 23, 2023

I am not sure if this is the right place to post, but I was wondering if there was a way to change the catalog name in the sprak config file. I noticed that it is using the name iceberg based on the Constants value of CATALOG_NAME. Is there anyway to make this configurable or add it to the spec? I am referring ehre to the Jdbc Catalog

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation bounty bounty-XL Maintainer program: claimable extra large bounty PR community connectors/destination/iceberg hacktober
Projects
No open projects
Development

Successfully merging this pull request may close these issues.

New Destination: Apache Iceberg
9 participants