Skip to content

Commit f976b19

Browse files
committed
Destination Iceberg: Bump Iceberg from 1.1.0 to 1.3.0 and add REST catalog support
1 parent e10f768 commit f976b19

File tree

15 files changed

+580
-17
lines changed

15 files changed

+580
-17
lines changed

airbyte-integrations/connectors/destination-iceberg/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,5 @@ ENV JAVA_OPTS="--add-opens java.base/java.lang=ALL-UNNAMED \
2929

3030
COPY --from=build /airbyte /airbyte
3131

32-
LABEL io.airbyte.version=0.1.2
32+
LABEL io.airbyte.version=0.1.3
3333
LABEL io.airbyte.name=airbyte/destination-iceberg

airbyte-integrations/connectors/destination-iceberg/build.gradle

+12-12
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,24 @@ dependencies {
1414
implementation project(':airbyte-integrations:bases:base-java')
1515
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
1616

17-
implementation('org.apache.spark:spark-sql_2.13:3.3.1') {
17+
implementation('org.apache.spark:spark-sql_2.13:3.3.2') {
1818
exclude(group: 'org.apache.hadoop', module: 'hadoop-common')
1919
}
20-
implementation('org.apache.spark:spark-hive_2.13:3.3.1') {
20+
implementation('org.apache.spark:spark-hive_2.13:3.3.2') {
2121
exclude(group: 'org.apache.hadoop', module: 'hadoop-common')
2222
}
23-
implementation 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.13:1.1.0'
23+
implementation 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.13:1.3.0'
2424

2525
// force awssdk version required by Iceberg
26-
implementation "software.amazon.awssdk:utils:2.17.257"
27-
implementation "software.amazon.awssdk:url-connection-client:2.17.257"
28-
implementation "software.amazon.awssdk:s3:2.17.257"
29-
implementation "software.amazon.awssdk:glue:2.17.257"
30-
implementation "software.amazon.awssdk:dynamodb:2.17.257"
31-
implementation "software.amazon.awssdk:kms:2.17.257"
32-
implementation "software.amazon.awssdk:sts:2.17.257"
33-
implementation "software.amazon.awssdk:sdk-core:2.17.257"
34-
implementation "software.amazon.awssdk:aws-core:2.17.257"
26+
implementation "software.amazon.awssdk:utils:2.20.18"
27+
implementation "software.amazon.awssdk:url-connection-client:2.20.18"
28+
implementation "software.amazon.awssdk:s3:2.20.18"
29+
implementation "software.amazon.awssdk:glue:2.20.18"
30+
implementation "software.amazon.awssdk:dynamodb:2.20.18"
31+
implementation "software.amazon.awssdk:kms:2.20.18"
32+
implementation "software.amazon.awssdk:sts:2.20.18"
33+
implementation "software.amazon.awssdk:sdk-core:2.20.18"
34+
implementation "software.amazon.awssdk:aws-core:2.20.18"
3535

3636
implementation "org.apache.hadoop:hadoop-aws:3.3.2"
3737
implementation "org.apache.hadoop:hadoop-client-api:3.3.2"

airbyte-integrations/connectors/destination-iceberg/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: database
33
connectorType: destination
44
definitionId: df65a8f3-9908-451b-aa9b-445462803560
5-
dockerImageTag: 0.1.2
5+
dockerImageTag: 0.1.3
66
dockerRepository: airbyte/destination-iceberg
77
githubIssueLabel: destination-iceberg
88
license: MIT

airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/IcebergConstants.java

+3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ public class IcebergConstants {
2727
public static final String JDBC_PASSWORD_CONFIG_KEY = "password";
2828
public static final String JDBC_SSL_CONFIG_KEY = "ssl";
2929
public static final String JDBC_CATALOG_SCHEMA_CONFIG_KEY = "catalog_schema";
30+
public static final String REST_CATALOG_URI_CONFIG_KEY = "rest_uri";
31+
public static final String REST_CATALOG_CREDENTIAL_CONFIG_KEY = "rest_credential";
32+
public static final String REST_CATALOG_TOKEN_CONFIG_KEY = "rest_token";
3033

3134
/**
3235
* Storage Config keys

airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/CatalogType.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@
1010
public enum CatalogType {
1111
HIVE,
1212
HADOOP,
13-
JDBC
13+
JDBC,
14+
REST
1415
}

airbyte-integrations/connectors/destination-iceberg/src/main/java/io/airbyte/integrations/destination/iceberg/config/catalog/IcebergCatalogConfigFactory.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ public IcebergCatalogConfig fromJsonNodeConfig(@Nonnull final JsonNode config) {
3838
IcebergCatalogConfig icebergCatalogConfig = genIcebergCatalogConfig(catalogConfigJson);
3939
icebergCatalogConfig.formatConfig = formatConfig;
4040
icebergCatalogConfig.storageConfig = storageConfig;
41-
icebergCatalogConfig.setDefaultOutputDatabase(catalogConfigJson.get(DEFAULT_DATABASE_CONFIG_KEY).asText());
41+
JsonNode defaultDb = catalogConfigJson.get(DEFAULT_DATABASE_CONFIG_KEY);
42+
if (null != defaultDb) {
43+
icebergCatalogConfig.setDefaultOutputDatabase(defaultDb.asText());
44+
}
4245

4346
return icebergCatalogConfig;
4447
}
@@ -70,6 +73,7 @@ private static IcebergCatalogConfig genIcebergCatalogConfig(@NotNull JsonNode ca
7073
case HIVE -> new HiveCatalogConfig(catalogConfigJson);
7174
case HADOOP -> new HadoopCatalogConfig(catalogConfigJson);
7275
case JDBC -> new JdbcCatalogConfig(catalogConfigJson);
76+
case REST -> new RESTCatalogConfig(catalogConfigJson);
7377
default -> throw new RuntimeException("Unexpected catalog config: " + catalogTypeStr);
7478
};
7579
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.iceberg.config.catalog;
6+
7+
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.CATALOG_NAME;
8+
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.REST_CATALOG_CREDENTIAL_CONFIG_KEY;
9+
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.REST_CATALOG_TOKEN_CONFIG_KEY;
10+
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.REST_CATALOG_URI_CONFIG_KEY;
11+
import static org.apache.commons.lang3.StringUtils.isNotBlank;
12+
13+
import com.fasterxml.jackson.databind.JsonNode;
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
17+
import com.google.common.base.Preconditions;
18+
import lombok.Data;
19+
import lombok.EqualsAndHashCode;
20+
import lombok.ToString;
21+
import org.apache.iceberg.CatalogProperties;
22+
import org.apache.iceberg.catalog.Catalog;
23+
import org.apache.iceberg.rest.RESTCatalog;
24+
import org.apache.iceberg.rest.auth.OAuth2Properties;
25+
import org.jetbrains.annotations.NotNull;
26+
27+
@Data
28+
@ToString(callSuper = true)
29+
@EqualsAndHashCode(callSuper = false)
30+
public class RESTCatalogConfig
31+
extends IcebergCatalogConfig {
32+
33+
private final String uri;
34+
private final String credential;
35+
private final String token;
36+
37+
public RESTCatalogConfig(@NotNull JsonNode catalogConfig) {
38+
Preconditions.checkArgument(null != catalogConfig.get(REST_CATALOG_URI_CONFIG_KEY), "%s is required", REST_CATALOG_URI_CONFIG_KEY);
39+
this.uri = catalogConfig.get(REST_CATALOG_URI_CONFIG_KEY).asText();
40+
JsonNode credentialNode = catalogConfig.get(REST_CATALOG_CREDENTIAL_CONFIG_KEY);
41+
JsonNode tokenNode = catalogConfig.get(REST_CATALOG_TOKEN_CONFIG_KEY);
42+
this.credential = null != credentialNode ? credentialNode.asText() : null;
43+
this.token = null != tokenNode ? tokenNode.asText() : null;
44+
}
45+
46+
@Override
47+
public Map<String, String> sparkConfigMap() {
48+
Map<String, String> configMap = new HashMap<>();
49+
configMap.put("spark.network.timeout", "300000");
50+
configMap.put("spark.sql.defaultCatalog", CATALOG_NAME);
51+
configMap.put("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");
52+
configMap.put("spark.sql.catalog." + CATALOG_NAME, "org.apache.iceberg.spark.SparkCatalog");
53+
configMap.put("spark.sql.catalog." + CATALOG_NAME + ".catalog-impl", "org.apache.iceberg.rest.RESTCatalog");
54+
configMap.put("spark.sql.catalog." + CATALOG_NAME + ".uri", this.uri);
55+
configMap.put("spark.driver.extraJavaOptions", "-Dpackaging.type=jar -Djava.io.tmpdir=/tmp");
56+
57+
if (isNotBlank(this.credential)) {
58+
configMap.put("spark.sql.catalog." + CATALOG_NAME + ".credential", this.credential);
59+
}
60+
if (isNotBlank(this.token)) {
61+
configMap.put("spark.sql.catalog." + CATALOG_NAME + ".token", this.token);
62+
}
63+
64+
configMap.putAll(this.storageConfig.sparkConfigMap(CATALOG_NAME));
65+
return configMap;
66+
}
67+
68+
@Override
69+
public Catalog genCatalog() {
70+
RESTCatalog catalog = new RESTCatalog();
71+
Map<String, String> properties = new HashMap<>(this.storageConfig.catalogInitializeProperties());
72+
properties.put(CatalogProperties.URI, this.uri);
73+
if (isNotBlank(this.credential)) {
74+
properties.put(OAuth2Properties.CREDENTIAL, this.credential);
75+
}
76+
if (isNotBlank(this.token)) {
77+
properties.put(OAuth2Properties.TOKEN, this.token);
78+
}
79+
catalog.initialize(CATALOG_NAME, properties);
80+
return catalog;
81+
}
82+
}

airbyte-integrations/connectors/destination-iceberg/src/main/resources/spec.json

+34
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,40 @@
118118
"order": 6
119119
}
120120
}
121+
},
122+
{
123+
"title": "RESTCatalog",
124+
"description": "The RESTCatalog connects to a REST server at the specified URI",
125+
"required": ["catalog_type", "rest_uri"],
126+
"properties": {
127+
"catalog_type": {
128+
"title": "Catalog Type",
129+
"type": "string",
130+
"default": "Rest",
131+
"enum": ["Rest"],
132+
"order": 0
133+
},
134+
"rest_uri": {
135+
"title": "REST Server URI",
136+
"type": "string",
137+
"examples": ["http://localhost:12345"],
138+
"order": 1
139+
},
140+
"rest_credential": {
141+
"title": "A credential to exchange for a token in the OAuth2 client credentials flow.",
142+
"type": "string",
143+
"airbyte_secret": true,
144+
"examples": ["username:password"],
145+
"order": 2
146+
},
147+
"rest_token": {
148+
"title": "A Bearer token which will be used for interaction with the server.",
149+
"type": "string",
150+
"airbyte_secret": true,
151+
"examples": ["eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"],
152+
"order": 3
153+
}
154+
}
121155
}
122156
],
123157
"order": 0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.iceberg.container;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import io.airbyte.commons.json.Jsons;
9+
import io.airbyte.integrations.destination.iceberg.config.format.DataFileFormat;
10+
import io.airbyte.integrations.destination.iceberg.hive.IcebergHiveCatalogS3ParquetIntegrationTest;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
import org.testcontainers.containers.DockerComposeContainer;
14+
import org.testcontainers.containers.wait.strategy.Wait;
15+
16+
import java.nio.file.Path;
17+
import java.time.Duration;
18+
import java.util.Map;
19+
20+
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_CATALOG_CONFIG_KEY;
21+
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_CATALOG_TYPE_CONFIG_KEY;
22+
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_FORMAT_CONFIG_KEY;
23+
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_STORAGE_CONFIG_KEY;
24+
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.ICEBERG_STORAGE_TYPE_CONFIG_KEY;
25+
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.REST_CATALOG_URI_CONFIG_KEY;
26+
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_ACCESS_KEY_ID_CONFIG_KEY;
27+
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_BUCKET_REGION_CONFIG_KEY;
28+
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_ENDPOINT_CONFIG_KEY;
29+
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_SECRET_KEY_CONFIG_KEY;
30+
import static io.airbyte.integrations.destination.iceberg.IcebergConstants.S3_WAREHOUSE_URI_CONFIG_KEY;
31+
import static java.util.Map.entry;
32+
import static java.util.Map.ofEntries;
33+
34+
public class RESTServerWithMinioCompose extends DockerComposeContainer<RESTServerWithMinioCompose> {
35+
36+
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergHiveCatalogS3ParquetIntegrationTest.class);
37+
private static final String LOCAL_RELATIVE_PATH = "src/test-integration/resources/";
38+
private static final String COMPOSE_PATH = LOCAL_RELATIVE_PATH + "rest-catalog-compose.yml";
39+
private static final int REST_SERVER_PORT = 8181;
40+
private static final int MINIO_PORT = 9000;
41+
private static final String REST_SERVICE_NAME = "rest_1";
42+
private static final String MINIO_SERVICE_NAME = "minio_1";
43+
44+
public RESTServerWithMinioCompose() {
45+
super(Path.of(COMPOSE_PATH).toFile());
46+
super.withExposedService(REST_SERVICE_NAME,
47+
REST_SERVER_PORT,
48+
Wait.forListeningPort().withStartupTimeout(Duration.ofSeconds(30)))
49+
.withExposedService(MINIO_SERVICE_NAME,
50+
MinioContainer.MINIO_PORT,
51+
Wait.forHttp(MinioContainer.HEALTH_ENDPOINT).withStartupTimeout(Duration.ofSeconds(60)))
52+
.withLocalCompose(true);
53+
}
54+
55+
@Override
56+
public void start() {
57+
long startTime = System.currentTimeMillis();
58+
super.start();
59+
LOGGER.info("REST Server port: {}", getServicePort(REST_SERVICE_NAME, REST_SERVER_PORT));
60+
LOGGER.info("Minio port: {}", getServicePort(MINIO_SERVICE_NAME, MINIO_PORT));
61+
LOGGER.info("REST Server docker-compose startup cost: {} ms", System.currentTimeMillis() - startTime);
62+
}
63+
64+
public String s3Endpoint() {
65+
return "http://localhost:" + getServicePort(MINIO_SERVICE_NAME, MINIO_PORT);
66+
}
67+
68+
public String restServerUri() {
69+
return "http://localhost:" + getServicePort(REST_SERVICE_NAME, REST_SERVER_PORT);
70+
}
71+
72+
public JsonNode getComposeConfig(DataFileFormat fileFormat) {
73+
String s3Endpoint = this.s3Endpoint();
74+
LOGGER.info("Configure S3 endpoint to {}", s3Endpoint);
75+
return Jsons.jsonNode(ofEntries(
76+
entry(ICEBERG_CATALOG_CONFIG_KEY,
77+
Jsons.jsonNode(ofEntries(
78+
entry(ICEBERG_CATALOG_TYPE_CONFIG_KEY, "Rest"),
79+
entry(REST_CATALOG_URI_CONFIG_KEY, this.restServerUri())))),
80+
entry(ICEBERG_STORAGE_CONFIG_KEY,
81+
Jsons.jsonNode(ofEntries(
82+
entry(ICEBERG_STORAGE_TYPE_CONFIG_KEY, "S3"),
83+
entry(S3_ACCESS_KEY_ID_CONFIG_KEY, "admin"),
84+
entry(S3_SECRET_KEY_CONFIG_KEY, "password"),
85+
entry(S3_WAREHOUSE_URI_CONFIG_KEY, "s3://warehouse/rest"),
86+
entry(S3_BUCKET_REGION_CONFIG_KEY, "us-east-1"),
87+
entry(S3_ENDPOINT_CONFIG_KEY, s3Endpoint)))),
88+
entry(ICEBERG_FORMAT_CONFIG_KEY,
89+
Jsons.jsonNode(Map.of("format", fileFormat.getConfigValue())))));
90+
}
91+
92+
public JsonNode getWrongConfig() {
93+
return Jsons.jsonNode(ofEntries(
94+
entry(ICEBERG_CATALOG_CONFIG_KEY,
95+
Jsons.jsonNode(ofEntries(
96+
entry(ICEBERG_CATALOG_TYPE_CONFIG_KEY, "Rest"),
97+
entry(REST_CATALOG_URI_CONFIG_KEY, "wrong-host:1234")))),
98+
entry(ICEBERG_STORAGE_CONFIG_KEY,
99+
Jsons.jsonNode(ofEntries(entry(ICEBERG_STORAGE_TYPE_CONFIG_KEY, "S3"),
100+
entry(S3_ACCESS_KEY_ID_CONFIG_KEY, "wrong_access_key"),
101+
entry(S3_SECRET_KEY_CONFIG_KEY, "wrong_secret_key"),
102+
entry(S3_WAREHOUSE_URI_CONFIG_KEY, "s3://warehouse/"),
103+
entry(S3_BUCKET_REGION_CONFIG_KEY, "us-east-1"),
104+
entry(S3_ENDPOINT_CONFIG_KEY, this.s3Endpoint())))),
105+
entry(ICEBERG_FORMAT_CONFIG_KEY, Jsons.jsonNode(Map.of("format", "wrong-format")))));
106+
}
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.iceberg.rest;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import io.airbyte.integrations.destination.iceberg.IcebergIntegrationTestUtil;
9+
import io.airbyte.integrations.destination.iceberg.config.format.DataFileFormat;
10+
import io.airbyte.integrations.destination.iceberg.container.RESTServerWithMinioCompose;
11+
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
12+
import org.junit.jupiter.api.AfterAll;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import java.util.List;
17+
18+
import static io.airbyte.integrations.destination.iceberg.IcebergIntegrationTestUtil.ICEBERG_IMAGE_NAME;
19+
20+
public abstract class BaseIcebergRESTCatalogS3IntegrationTest extends DestinationAcceptanceTest {
21+
22+
private static final Logger LOGGER = LoggerFactory.getLogger(BaseIcebergRESTCatalogS3IntegrationTest.class);
23+
24+
private static RESTServerWithMinioCompose composeContainer;
25+
private static JsonNode config;
26+
27+
static void startCompose(DataFileFormat fileFormat) {
28+
composeContainer = new RESTServerWithMinioCompose();
29+
composeContainer.start();
30+
config = composeContainer.getComposeConfig(fileFormat);
31+
IcebergIntegrationTestUtil.createS3WarehouseBucket(config);
32+
LOGGER.info("==> Started REST Server with Minio - Docker Compose...");
33+
}
34+
35+
@AfterAll
36+
public static void stopCompose() {
37+
IcebergIntegrationTestUtil.stopAndCloseContainer(composeContainer, "REST Server with Minio - Docker Compose");
38+
}
39+
40+
@Override
41+
protected void setup(final TestDestinationEnv testEnv) {}
42+
43+
@Override
44+
protected void tearDown(final TestDestinationEnv testEnv) {}
45+
46+
@Override
47+
protected String getImageName() {
48+
return ICEBERG_IMAGE_NAME;
49+
}
50+
51+
@Override
52+
protected JsonNode getConfig() {
53+
return config;
54+
}
55+
56+
@Override
57+
protected JsonNode getFailCheckConfig() {
58+
return composeContainer.getWrongConfig();
59+
}
60+
61+
@Override
62+
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
63+
String streamName,
64+
String namespace,
65+
JsonNode streamSchema)
66+
throws Exception {
67+
return IcebergIntegrationTestUtil.retrieveRecords(getConfig(), namespace, streamName);
68+
}
69+
}

0 commit comments

Comments
 (0)