Skip to content

Commit 98236d6

Browse files
etsybaevjhammarstedt
authored andcommitted
🎉Destination-elasticsearch: added custom sertificate support (airbytehq#18177)
* [11356] Destination-elasticsearch: added custom certificate support
1 parent 58fa78c commit 98236d6

File tree

14 files changed

+102
-132
lines changed

14 files changed

+102
-132
lines changed

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@
100100
- destinationDefinitionId: 68f351a7-2745-4bef-ad7f-996b8e51bb8c
101101
name: ElasticSearch
102102
dockerRepository: airbyte/destination-elasticsearch
103-
dockerImageTag: 0.1.4
103+
dockerImageTag: 0.1.5
104104
documentationUrl: https://docs.airbyte.com/integrations/destinations/elasticsearch
105105
icon: elasticsearch.svg
106106
releaseStage: alpha

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1700,7 +1700,7 @@
17001700
supported_destination_sync_modes:
17011701
- "overwrite"
17021702
- "append"
1703-
- dockerImage: "airbyte/destination-elasticsearch:0.1.4"
1703+
- dockerImage: "airbyte/destination-elasticsearch:0.1.5"
17041704
spec:
17051705
documentationUrl: "https://docs.airbyte.com/integrations/destinations/elasticsearch"
17061706
connectionSpecification:
@@ -1722,6 +1722,12 @@
17221722
\ will be performed using the primary key value as the elasticsearch doc\
17231723
\ id. Does not support composite primary keys."
17241724
default: true
1725+
ca_certificate:
1726+
type: "string"
1727+
title: "CA certificate"
1728+
description: "CA certificate"
1729+
airbyte_secret: true
1730+
multiline: true
17251731
authenticationMethod:
17261732
title: "Authentication Method"
17271733
type: "object"

airbyte-db/db-lib/src/main/java/io/airbyte/db/util/SSLCertificateUtils.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
import java.util.Objects;
2929
import java.util.Random;
3030
import java.util.concurrent.TimeUnit;
31+
import javax.net.ssl.SSLContext;
32+
import org.apache.http.ssl.SSLContextBuilder;
33+
import org.apache.http.ssl.SSLContexts;
3134
import org.slf4j.Logger;
3235
import org.slf4j.LoggerFactory;
3336

@@ -160,4 +163,21 @@ public static URI keyStoreFromClientCertificate(
160163
return keyStoreFromClientCertificate(certString, keyString, keyStorePassword, FileSystems.getDefault(), directory);
161164
}
162165

166+
public static SSLContext createContextFromCaCert(String caCertificate) {
167+
try {
168+
CertificateFactory factory = CertificateFactory.getInstance(X509);
169+
Certificate trustedCa = factory.generateCertificate(
170+
new ByteArrayInputStream(caCertificate.getBytes(StandardCharsets.UTF_8))
171+
);
172+
KeyStore trustStore = KeyStore.getInstance(PKCS_12);
173+
trustStore.load(null, null);
174+
trustStore.setCertificateEntry("ca", trustedCa);
175+
SSLContextBuilder sslContextBuilder =
176+
SSLContexts.custom().loadTrustMaterial(trustStore, null);
177+
return sslContextBuilder.build();
178+
} catch (Exception e) {
179+
throw new RuntimeException(e);
180+
}
181+
}
182+
163183
}

airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION destination-elasticsearch-strict-encrypt
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.1.4
19+
LABEL io.airbyte.version=0.1.5
2020
LABEL io.airbyte.name=airbyte/destination-elasticsearch-strict-encrypt

airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchStrictEncryptDestinationAcceptanceTest.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,43 @@
44

55
package io.airbyte.integrations.destination.elasticsearch;
66

7+
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
79
import com.fasterxml.jackson.databind.JsonNode;
810
import com.fasterxml.jackson.databind.ObjectMapper;
911
import com.google.common.collect.ImmutableMap;
1012
import io.airbyte.commons.json.Jsons;
13+
import io.airbyte.config.StandardCheckConnectionOutput.Status;
1114
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
1215
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
1316
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
1417
import java.io.IOException;
18+
import java.io.InputStream;
19+
import java.nio.charset.StandardCharsets;
20+
import java.time.Duration;
1521
import java.util.List;
1622
import java.util.Map;
1723
import org.junit.jupiter.api.AfterAll;
1824
import org.junit.jupiter.api.BeforeAll;
25+
import org.junit.jupiter.api.Test;
1926
import org.testcontainers.elasticsearch.ElasticsearchContainer;
2027

2128
public class ElasticsearchStrictEncryptDestinationAcceptanceTest extends DestinationAcceptanceTest {
2229

23-
private final ObjectMapper mapper = new ObjectMapper();
2430
private static ElasticsearchContainer container;
31+
private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:8.3.3";
32+
private final ObjectMapper mapper = new ObjectMapper();
2533

2634
@BeforeAll
2735
public static void beforeAll() {
2836

29-
container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.15.1")
30-
.withPassword("MagicWord");
37+
container = new ElasticsearchContainer(IMAGE_NAME)
38+
.withEnv("discovery.type", "single-node")
39+
.withEnv("network.host", "0.0.0.0")
40+
.withEnv("logger.org.elasticsearch", "INFO")
41+
.withEnv("ingest.geoip.downloader.enabled", "false")
42+
.withExposedPorts(9200)
43+
.withPassword("s3cret");
3144

3245
container.start();
3346
}
@@ -84,11 +97,14 @@ protected JsonNode getConfig() {
8497
final JsonNode authConfig = Jsons.jsonNode(Map.of(
8598
"method", "basic",
8699
"username", "elastic",
87-
"password", "MagicWord"));
100+
"password", "s3cret"));
88101

89102
return Jsons.jsonNode(ImmutableMap.builder()
90-
.put("endpoint", String.format("http://%s:%s", container.getHost(), container.getMappedPort(9200)))
103+
.put("endpoint", String.format("https://%s:%s", container.getHost(), container.getMappedPort(9200)))
91104
.put("authenticationMethod", authConfig)
105+
.put("ca_certificate", new String(container.copyFileFromContainer(
106+
"/usr/share/elasticsearch/config/certs/http_ca.crt",
107+
InputStream::readAllBytes), StandardCharsets.UTF_8))
92108
.build());
93109
}
94110

airbyte-integrations/connectors/destination-elasticsearch-strict-encrypt/src/test/resources/expected_spec.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@
2323
"description": "If a primary key identifier is defined in the source, an upsert will be performed using the primary key value as the elasticsearch doc id. Does not support composite primary keys.",
2424
"default": true
2525
},
26+
"ca_certificate": {
27+
"type": "string",
28+
"title": "CA certificate",
29+
"description": "CA certificate",
30+
"airbyte_secret": true,
31+
"multiline": true
32+
},
2633
"authenticationMethod": {
2734
"title": "Authentication Method",
2835
"type": "object",

airbyte-integrations/connectors/destination-elasticsearch/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION destination-elasticsearch
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.1.4
19+
LABEL io.airbyte.version=0.1.5
2020
LABEL io.airbyte.name=airbyte/destination-elasticsearch

airbyte-integrations/connectors/destination-elasticsearch/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ application {
1010
}
1111

1212
dependencies {
13+
implementation project(':airbyte-db:db-lib')
1314
implementation project(':airbyte-config:config-models')
1415
implementation project(':airbyte-protocol:protocol-models')
1516
implementation project(':airbyte-integrations:bases:base-java')

airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ConnectorConfiguration.java

Lines changed: 6 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -5,71 +5,28 @@
55
package io.airbyte.integrations.destination.elasticsearch;
66

77
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
8+
import com.fasterxml.jackson.annotation.JsonProperty;
89
import com.fasterxml.jackson.databind.JsonNode;
910
import com.fasterxml.jackson.databind.ObjectMapper;
1011
import java.util.Objects;
12+
import lombok.Data;
1113

1214
@JsonIgnoreProperties(ignoreUnknown = true)
15+
@Data
1316
public class ConnectorConfiguration {
1417

1518
private String endpoint;
1619
private boolean upsert;
20+
@JsonProperty("ca_certificate")
21+
private String caCertificate;
1722
private AuthenticationMethod authenticationMethod = new AuthenticationMethod();
1823

19-
public ConnectorConfiguration() {}
20-
2124
public static ConnectorConfiguration fromJsonNode(JsonNode config) {
2225
return new ObjectMapper().convertValue(config, ConnectorConfiguration.class);
2326
}
2427

25-
public String getEndpoint() {
26-
return this.endpoint;
27-
}
28-
29-
public boolean isUpsert() {
30-
return this.upsert;
31-
}
32-
33-
public AuthenticationMethod getAuthenticationMethod() {
34-
return this.authenticationMethod;
35-
}
36-
37-
public void setEndpoint(String endpoint) {
38-
this.endpoint = endpoint;
39-
}
40-
41-
public void setUpsert(boolean upsert) {
42-
this.upsert = upsert;
43-
}
44-
45-
public void setAuthenticationMethod(AuthenticationMethod authenticationMethod) {
46-
this.authenticationMethod = authenticationMethod;
47-
}
48-
49-
@Override
50-
public boolean equals(Object o) {
51-
if (this == o)
52-
return true;
53-
if (o == null || getClass() != o.getClass())
54-
return false;
55-
ConnectorConfiguration that = (ConnectorConfiguration) o;
56-
return upsert == that.upsert && Objects.equals(endpoint, that.endpoint) && Objects.equals(authenticationMethod, that.authenticationMethod);
57-
}
58-
59-
@Override
60-
public int hashCode() {
61-
return Objects.hash(endpoint, upsert, authenticationMethod);
62-
}
63-
64-
@Override
65-
public String toString() {
66-
return "ConnectorConfiguration{" +
67-
"endpoint='" + endpoint + '\'' +
68-
", upsert=" + upsert +
69-
", authenticationMethod=" + authenticationMethod +
70-
'}';
71-
}
7228

29+
@Data
7330
static class AuthenticationMethod {
7431

7532
private ElasticsearchAuthenticationMethod method = ElasticsearchAuthenticationMethod.none;
@@ -78,46 +35,6 @@ static class AuthenticationMethod {
7835
private String apiKeyId;
7936
private String apiKeySecret;
8037

81-
public ElasticsearchAuthenticationMethod getMethod() {
82-
return this.method;
83-
}
84-
85-
public String getUsername() {
86-
return this.username;
87-
}
88-
89-
public String getPassword() {
90-
return this.password;
91-
}
92-
93-
public String getApiKeyId() {
94-
return this.apiKeyId;
95-
}
96-
97-
public String getApiKeySecret() {
98-
return this.apiKeySecret;
99-
}
100-
101-
public void setMethod(ElasticsearchAuthenticationMethod method) {
102-
this.method = method;
103-
}
104-
105-
public void setUsername(String username) {
106-
this.username = username;
107-
}
108-
109-
public void setPassword(String password) {
110-
this.password = password;
111-
}
112-
113-
public void setApiKeyId(String apiKeyId) {
114-
this.apiKeyId = apiKeyId;
115-
}
116-
117-
public void setApiKeySecret(String apiKeySecret) {
118-
this.apiKeySecret = apiKeySecret;
119-
}
120-
12138
public boolean isValid() {
12239
return switch (this.method) {
12340
case none -> true;
@@ -126,34 +43,6 @@ public boolean isValid() {
12643
};
12744
}
12845

129-
@Override
130-
public boolean equals(Object o) {
131-
if (this == o)
132-
return true;
133-
if (o == null || getClass() != o.getClass())
134-
return false;
135-
AuthenticationMethod that = (AuthenticationMethod) o;
136-
return method == that.method &&
137-
Objects.equals(username, that.username) &&
138-
Objects.equals(password, that.password) &&
139-
Objects.equals(apiKeyId, that.apiKeyId) &&
140-
Objects.equals(apiKeySecret, that.apiKeySecret);
141-
}
142-
143-
@Override
144-
public int hashCode() {
145-
return Objects.hash(method, username, password, apiKeyId, apiKeySecret);
146-
}
147-
148-
@Override
149-
public String toString() {
150-
return "AuthenticationMethod{" +
151-
"method=" + method +
152-
", username='" + username + '\'' +
153-
", apiKeyId='" + apiKeyId + '\'' +
154-
'}';
155-
}
156-
15746
}
15847

15948
}

airbyte-integrations/connectors/destination-elasticsearch/src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.fasterxml.jackson.core.JsonPointer;
1818
import com.fasterxml.jackson.databind.JsonNode;
1919
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import io.airbyte.db.util.SSLCertificateUtils;
2021
import io.airbyte.protocol.models.AirbyteRecordMessage;
2122
import jakarta.json.JsonValue;
2223
import java.io.IOException;
@@ -28,6 +29,7 @@
2829
import org.apache.http.message.BasicHeader;
2930
import org.elasticsearch.client.Node;
3031
import org.elasticsearch.client.RestClient;
32+
import org.elasticsearch.client.RestClientBuilder;
3133
import org.slf4j.Logger;
3234
import org.slf4j.LoggerFactory;
3335

@@ -56,7 +58,17 @@ public ElasticsearchConnection(ConnectorConfiguration config) {
5658

5759
// Create the low-level client
5860
httpHost = HttpHost.create(config.getEndpoint());
59-
restClient = RestClient.builder(httpHost)
61+
final RestClientBuilder builder = RestClient.builder(httpHost);
62+
63+
// Set custom user's certificate if provided
64+
if (config.getCaCertificate() != null && !config.getCaCertificate().isEmpty()){
65+
builder.setHttpClientConfigCallback(clientBuilder -> {
66+
clientBuilder.setSSLContext(SSLCertificateUtils.createContextFromCaCert(config.getCaCertificate()));
67+
return clientBuilder;
68+
});
69+
}
70+
71+
restClient = builder
6072
.setDefaultHeaders(configureHeaders(config))
6173
.setFailureListener(new FailureListener())
6274
.build();

0 commit comments

Comments
 (0)