-
Notifications
You must be signed in to change notification settings - Fork 4.5k
🎉 New Destination: Apache Iceberg #18836
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
8c4a341
to
658b609
Compare
thanks for the contribution @Leibnizhu someone from the team will review during the week. |
@Leibnizhu do you need assistance or review on this work? |
yep, pls give some advice |
if (!namespaceSet.contains(namespace)) { | ||
namespaceSet.add(namespace); | ||
try { | ||
spark.sql("CREATE DATABASE IF NOT EXISTS " + namespace); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When it comes to sql databases most of the time the namespace is used to separate the data in logical schemas. is it really needed to create a new database per namespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For iceberg, a table must belong to a database. iceberg's database is for physical isolation, and it's metadata is stored in catalog(e.g. HiveCatalog by hive metastore server).
In this PR, we assume that one namespace is mapping to one iceberg database.
...on-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/IcebergDestination.java
Outdated
Show resolved
Hide resolved
writeConfig.isAppendMode() ? "append" : "overwrite", | ||
tempTableName, | ||
finalTableName); | ||
spark.sql("SELECT * FROM %s".formatted(tempTableName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you point me out to where the temp and full tables are created? is this done implicitly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they are created implicitly by spark sql ( saveAsTable
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imo temp tables should be created in the startTracked() method which is used for executing all the preparation steps before starting to receive records. It is better to fail fast if you can't create something than to find out midway while replicating data.
public AirbyteConnectionStatus check(JsonNode config) { | ||
try { | ||
IcebergCatalogConfig icebergCatalogConfig = icebergCatalogConfigFactory.fromJsonNodeConfig(config); | ||
icebergCatalogConfig.check(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big fan of performing business logic in a config class. Imo config classes should only contain methods for initialization of objects. The cleanup logic is also error prone, one example is if you manage to create a table with
Table tempTable = catalog.createTable(tempTableId, schema);
but somehow fail on
btw why do you read records instead of writing?
try (CloseableIterable<Record> records = IcebergGenerics.read(tempTable).build()) {
for (Record record : records) {
// never reach
log.info("Record in temp table: {}", record);
}
}
your cleanup logic with
boolean dropSuccess = catalog.dropTable(tempTableId);
wont be executed and for that reason such logic should be part of a finally{} block
/test connector=connectors/destination-iceberg
Build FailedTest summary info:
|
Hey @Leibnizhu what type of setup/config do we need to get the tests in |
I have already write tests with |
@Leibnizhu okay then please remove it so we can have a passing test run in the CI |
/test connector=connectors/destination-iceberg
Build PassedTest summary info:
|
@Data | ||
public class FormatConfig { | ||
|
||
public static final int DEFAULT_FLUSH_BATCH_SIZE = 10000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Buffering records based on number is a bit risky since 10_000 records depending on size can quickly exhaust your memory. It is better if actual storage size is taken into consideration as well.
.../src/main/java/io/airbyte/integrations/destination/iceberg/config/format/DataFileFormat.java
Outdated
Show resolved
Hide resolved
...ation-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/IcebergConsumer.java
Outdated
Show resolved
Hide resolved
@marcosmarxm Overall this connector seems solid with some improvements points from my side. I think that we can proceed with merging. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Leibnizhu
…package 3. add unit tests
88f3125
to
dde1a8a
Compare
thanks for reviewing. I will keep on improving these points at next PR. |
1. create temp table in startTracked() 2. extract iceberg spark sql operations to IcebergOperations, from IcebergConsumer and IcebergCatalogConfig 3. in check() method, create temp table, then write something and read it , and drop table finally.
Trying to test the connector but getting the following error. Any idea? 2023-02-06 16:36:22 ERROR i.a.w.i.DefaultAirbyteStreamFactory(internalLog):163 - Exception attempting to access the Iceberg catalog: |
@wahbiharibi is your hive normally, hive you could try another nonexistent database name, and Iceberg connector will create it with s3 location for you. For example, |
Thanks @Leibnizhu. It worked :) |
Currently blocked by https://github.com/airbytehq/airbyte-internal-issues/issues/2695 , waiting on sandbox account. |
I am not sure if this is the right place to post, but I was wondering if there was a way to change the catalog name in the sprak config file. I noticed that it is using the name iceberg based on the Constants value of CATALOG_NAME. Is there anyway to make this configurable or add it to the spec? I am referring ehre to the Jdbc Catalog |
What
Add Apache Iceberg Destination.
close #2833. Related to #6745.
How
Add Apache Iceberg Destination, implements by Spark SQL(3.3.0) with iceberg-spark-runtime(1.0.0).
Spark runs on local mode.
Recommended reading order
spec.json
IcebergDestination.java
IcebergConsumer.java
🚨 User Impact 🚨
No breaking changes, only new destination.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereTests
see https://gist.github.com/Leibnizhu/c3eaad1b04ad8a8becb77cda28b47313
execute:
./gradlew :airbyte-integrations:connectors:destination-iceberg:integrationTestJava --tests "*IntegrationTest"
see https://gist.github.com/Leibnizhu/33977e02c191e8d0da68c3a1af2bce57
execute:
see https://gist.github.com/Leibnizhu/c6c76cc9f42856795bb4895ee434849c