-
Notifications
You must be signed in to change notification settings - Fork 4.6k
🎉 Source MySql: Added SSL certificates to MySql Source #15044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 29 commits
Commits
Show all changes
31 commits
Select commit
Hold shift + click to select a range
92b8b8f
updated mysql source specification and added field for root and clien…
andriikorotkov 93f7913
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov 7178243
added SSL mode for mysql source
andriikorotkov f5bd946
fixed code style
andriikorotkov cfe583d
updated run process timeout
andriikorotkov 6e2dbb3
updated method for create keystore and updated tests
andriikorotkov b872f43
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov ea08e2b
updated normalization version for postgres destination
andriikorotkov ac0f651
updated normalization version for postgres destination
andriikorotkov a10374d
added tests for connection with certificates
andriikorotkov 67d67ae
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov 9cc861d
updated tests for connection with full certificates and added tests f…
andriikorotkov 2575317
updated tests
andriikorotkov bd1fcb5
updated source-mysql-strict-encrypt and updated versions
andriikorotkov 95d6545
updated code style
andriikorotkov 73bd508
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov 5ccdc64
updated doc
andriikorotkov 15b31b4
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov 6c5a9a9
updated specs
andriikorotkov 9109ec1
fixed minor remarks
andriikorotkov 3c908c9
fixed minor remarks
andriikorotkov 0e1c0cb
updated tests
andriikorotkov 2c47c57
fixed remarks and updated specification
andriikorotkov 135ae36
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov 2418e98
fixed mysql sources connectors version
andriikorotkov 132c7ce
added CDC + SSL Certificates tests
andriikorotkov 5503174
added property for CDC and added tests for test SSL with CDC together
andriikorotkov 5f2dad7
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov 9e732d7
fixed MySqlStrictEncryptJdbcSourceAcceptanceTest for work with dateti…
andriikorotkov 5c2d5aa
added property for CDC and added tests for test SSL with CDC together
andriikorotkov 099f89d
auto-bump connector version [ci skip]
octavia-squidington-iii File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
74 changes: 74 additions & 0 deletions
74
airbyte-db/db-lib/src/main/java/io/airbyte/db/MySqlUtils.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.db; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import java.io.IOException; | ||
import org.testcontainers.containers.MySQLContainer; | ||
|
||
public class MySqlUtils { | ||
|
||
@VisibleForTesting | ||
public static Certificate getCertificate(final MySQLContainer<?> container, | ||
final boolean useAllCertificates) | ||
throws IOException, InterruptedException { | ||
// add root and server certificates to config file | ||
container.execInContainer("sh", "-c", "sed -i '31 a ssl' /etc/my.cnf"); | ||
container.execInContainer("sh", "-c", "sed -i '32 a ssl-ca=/var/lib/mysql/ca.pem' /etc/my.cnf"); | ||
container.execInContainer("sh", "-c", "sed -i '33 a ssl-cert=/var/lib/mysql/server-cert.pem' /etc/my.cnf"); | ||
container.execInContainer("sh", "-c", "sed -i '34 a ssl-key=/var/lib/mysql/server-key.pem' /etc/my.cnf"); | ||
container.execInContainer("sh", "-c", "sed -i '35 a require_secure_transport=ON' /etc/my.cnf"); | ||
// add client certificates to config file | ||
if (useAllCertificates) { | ||
container.execInContainer("sh", "-c", "sed -i '39 a [client]' /etc/mysql/my.cnf"); | ||
container.execInContainer("sh", "-c", "sed -i '40 a ssl-ca=/var/lib/mysql/ca.pem' /etc/my.cnf"); | ||
container.execInContainer("sh", "-c", "sed -i '41 a ssl-cert=/var/lib/mysql/client-cert.pem' /etc/my.cnf"); | ||
container.execInContainer("sh", "-c", "sed -i '42 a ssl-key=/var/lib/mysql/client-key.pem' /etc/my.cnf"); | ||
} | ||
// copy root certificate and client certificates | ||
var caCert = container.execInContainer("sh", "-c", "cat /var/lib/mysql/ca.pem").getStdout().trim(); | ||
|
||
if (useAllCertificates) { | ||
var clientKey = container.execInContainer("sh", "-c", "cat /var/lib/mysql/client-key.pem").getStdout().trim(); | ||
var clientCert = container.execInContainer("sh", "-c", "cat /var/lib/mysql/client-cert.pem").getStdout().trim(); | ||
return new Certificate(caCert, clientCert, clientKey); | ||
} else { | ||
return new Certificate(caCert); | ||
} | ||
} | ||
|
||
public static class Certificate { | ||
|
||
private final String caCertificate; | ||
private final String clientCertificate; | ||
private final String clientKey; | ||
|
||
public Certificate(final String caCertificate) { | ||
this.caCertificate = caCertificate; | ||
this.clientCertificate = null; | ||
this.clientKey = null; | ||
} | ||
|
||
public Certificate(final String caCertificate, final String clientCertificate, final String clientKey) { | ||
this.caCertificate = caCertificate; | ||
this.clientCertificate = clientCertificate; | ||
this.clientKey = clientKey; | ||
} | ||
|
||
public String getCaCertificate() { | ||
return caCertificate; | ||
} | ||
|
||
public String getClientCertificate() { | ||
return clientCertificate; | ||
} | ||
|
||
public String getClientKey() { | ||
return clientKey; | ||
} | ||
|
||
} | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
205 changes: 205 additions & 0 deletions
205
...s/bases/base-java/src/main/java/io/airbyte/integrations/util/MySqlSslConnectionUtils.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,205 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.util; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
|
||
import java.io.BufferedReader; | ||
import java.io.File; | ||
import java.io.FileReader; | ||
import java.io.IOException; | ||
import java.io.PrintWriter; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.concurrent.TimeUnit; | ||
import org.apache.commons.lang3.RandomStringUtils; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class MySqlSslConnectionUtils { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSslConnectionUtils.class); | ||
|
||
public static final String PARAM_MODE = "mode"; | ||
public static final String PARAM_CLIENT_KEY_PASSWORD = "client_key_password"; | ||
public static final String PARAM_CA_CERTIFICATE = "ca_certificate"; | ||
public static final String PARAM_CLIENT_CERTIFICATE = "client_certificate"; | ||
public static final String PARAM_CLIENT_KEY = "client_key"; | ||
public static final String TRUST_KEY_STORE_URL = "trustCertificateKeyStoreUrl"; | ||
public static final String TRUST_KEY_STORE_PASS = "trustCertificateKeyStorePassword"; | ||
public static final String CLIENT_KEY_STORE_URL = "clientCertificateKeyStoreUrl"; | ||
public static final String CLIENT_KEY_STORE_PASS = "clientCertificateKeyStorePassword"; | ||
public static final String CUSTOM_TRUST_STORE = "customtruststore.jks"; | ||
public static final String CUSTOM_KEY_STORE = "customkeystore.jks"; | ||
public static final String SSL_MODE = "sslMode"; | ||
public static final String VERIFY_CA = "VERIFY_CA"; | ||
public static final String VERIFY_IDENTITY = "VERIFY_IDENTITY"; | ||
public static final String ROOT_CERTIFICARE_NAME = "ca-cert.pem"; | ||
public static final String ROOT_CERTIFICARE_DER_NAME = "ca-cert.der"; | ||
public static final String CLIENT_CERTIFICARE_NAME = "client-cert.pem"; | ||
public static final String CLIENT_KEY_NAME = "client-key.pem"; | ||
public static final String CLIENT_CERT_P12 = "certificate.p12"; | ||
public static final String ENCRYPT_FILE_NAME = "encrypt"; | ||
|
||
public static Map<String, String> obtainConnection(final JsonNode encryption) { | ||
Map<String, String> additionalParameters = new HashMap<>(); | ||
if (!encryption.isNull()) { | ||
final var method = encryption.get(PARAM_MODE).asText().toUpperCase(); | ||
var keyStorePassword = checkOrCreatePassword(encryption); | ||
if (method.equals(VERIFY_CA) || method.equals(VERIFY_IDENTITY)) { | ||
additionalParameters.putAll(checkCertificatesAndObtainConnection(encryption, method, keyStorePassword)); | ||
} | ||
} | ||
return additionalParameters; | ||
} | ||
|
||
private static String checkOrCreatePassword(final JsonNode encryption) { | ||
String sslPassword = encryption.has(PARAM_CLIENT_KEY_PASSWORD) ? encryption.get(PARAM_CLIENT_KEY_PASSWORD).asText() : ""; | ||
var keyStorePassword = RandomStringUtils.randomAlphanumeric(10); | ||
if (sslPassword.isEmpty()) { | ||
var file = new File(ENCRYPT_FILE_NAME); | ||
if (file.exists()) { | ||
keyStorePassword = readFile(file); | ||
} else { | ||
try { | ||
createCertificateFile(ENCRYPT_FILE_NAME, keyStorePassword); | ||
} catch (final IOException e) { | ||
throw new RuntimeException("Failed to create encryption file "); | ||
} | ||
} | ||
|
||
} else { | ||
keyStorePassword = sslPassword; | ||
} | ||
return keyStorePassword; | ||
} | ||
|
||
private static String readFile(final File file) { | ||
try { | ||
BufferedReader reader = new BufferedReader(new FileReader(file)); | ||
String currentLine = reader.readLine(); | ||
reader.close(); | ||
return currentLine; | ||
} catch (final IOException e) { | ||
throw new RuntimeException("Failed to read file with encryption"); | ||
} | ||
} | ||
|
||
private static Map<String, String> checkCertificatesAndObtainConnection(final JsonNode encryption, | ||
final String mode, | ||
final String clientKeyPassword) { | ||
var clientCert = encryption.has(PARAM_CLIENT_CERTIFICATE) && | ||
!encryption.get(PARAM_CLIENT_CERTIFICATE).asText().isEmpty() ? encryption.get(PARAM_CLIENT_CERTIFICATE).asText() : null; | ||
var clientKey = encryption.has(PARAM_CLIENT_KEY) && | ||
!encryption.get(PARAM_CLIENT_KEY).asText().isEmpty() ? encryption.get(PARAM_CLIENT_KEY).asText() : null; | ||
if (Objects.nonNull(clientCert) && Objects.nonNull(clientKey)) { | ||
return obtainConnectionWithFullCertificatesOptions(encryption, mode, clientKeyPassword); | ||
} else if (Objects.isNull(clientCert) && Objects.isNull(clientKey)) { | ||
return obtainConnectionWithCaCertificateOptions(encryption, mode, clientKeyPassword); | ||
} else { | ||
throw new RuntimeException("Both fields \"Client certificate\" and \"Client key\" must be added to connect with client certificates."); | ||
} | ||
} | ||
|
||
private static Map<String, String> obtainConnectionWithFullCertificatesOptions(final JsonNode encryption, | ||
final String mode, | ||
final String clientKeyPassword) { | ||
Map<String, String> additionalParameters = new HashMap<>(); | ||
try { | ||
convertAndImportFullCertificate(encryption.get(PARAM_CA_CERTIFICATE).asText(), | ||
encryption.get(PARAM_CLIENT_CERTIFICATE).asText(), | ||
encryption.get(PARAM_CLIENT_KEY).asText(), clientKeyPassword); | ||
} catch (final IOException | InterruptedException e) { | ||
throw new RuntimeException("Failed to import certificate into Java Keystore"); | ||
} | ||
additionalParameters.put(TRUST_KEY_STORE_URL, "file:" + CUSTOM_TRUST_STORE); | ||
additionalParameters.put(TRUST_KEY_STORE_PASS, clientKeyPassword); | ||
additionalParameters.put(CLIENT_KEY_STORE_URL, "file:" + CUSTOM_KEY_STORE); | ||
additionalParameters.put(CLIENT_KEY_STORE_PASS, clientKeyPassword); | ||
additionalParameters.put(SSL_MODE, mode); | ||
|
||
updateTrustStoreSystemProperty(clientKeyPassword); | ||
System.setProperty("javax.net.ssl.keyStore", CUSTOM_KEY_STORE); | ||
System.setProperty("javax.net.ssl.keyStorePassword", clientKeyPassword); | ||
|
||
return additionalParameters; | ||
} | ||
|
||
private static Map<String, String> obtainConnectionWithCaCertificateOptions(final JsonNode encryption, | ||
final String mode, | ||
final String clientKeyPassword) { | ||
Map<String, String> additionalParameters = new HashMap<>(); | ||
try { | ||
convertAndImportCaCertificate(encryption.get(PARAM_CA_CERTIFICATE).asText(), clientKeyPassword); | ||
} catch (final IOException | InterruptedException e) { | ||
throw new RuntimeException("Failed to import certificate into Java Keystore"); | ||
} | ||
additionalParameters.put(TRUST_KEY_STORE_URL, "file:" + CUSTOM_TRUST_STORE); | ||
additionalParameters.put(TRUST_KEY_STORE_PASS, clientKeyPassword); | ||
additionalParameters.put(SSL_MODE, mode); | ||
|
||
updateTrustStoreSystemProperty(clientKeyPassword); | ||
|
||
return additionalParameters; | ||
} | ||
|
||
private static void convertAndImportFullCertificate(final String caCertificate, | ||
final String clientCertificate, | ||
final String clientKey, | ||
final String clientKeyPassword) | ||
throws IOException, InterruptedException { | ||
final Runtime run = Runtime.getRuntime(); | ||
convertAndImportCaCertificate(caCertificate, clientKeyPassword); | ||
createCertificateFile(CLIENT_CERTIFICARE_NAME, clientCertificate); | ||
createCertificateFile(CLIENT_KEY_NAME, clientKey); | ||
// add client certificate to the custom keystore | ||
runProcess("openssl pkcs12 -export -in " + CLIENT_CERTIFICARE_NAME + " -inkey " + CLIENT_KEY_NAME + | ||
" -out " + CLIENT_CERT_P12 + " -name \"certificate\" -passout pass:" + clientKeyPassword, run); | ||
// add client key to the custom keystore | ||
runProcess("keytool -importkeystore -srckeystore " + CLIENT_CERT_P12 + | ||
" -srcstoretype pkcs12 -destkeystore " + CUSTOM_KEY_STORE + " -srcstorepass " + clientKeyPassword + | ||
" -deststoretype JKS -deststorepass " + clientKeyPassword + " -noprompt", run); | ||
} | ||
|
||
private static void convertAndImportCaCertificate(final String caCertificate, | ||
final String clientKeyPassword) | ||
throws IOException, InterruptedException { | ||
final Runtime run = Runtime.getRuntime(); | ||
createCaCertificate(caCertificate, clientKeyPassword, run); | ||
} | ||
|
||
private static void createCaCertificate(final String caCertificate, | ||
final String clientKeyPassword, | ||
final Runtime run) | ||
throws IOException, InterruptedException { | ||
createCertificateFile(ROOT_CERTIFICARE_NAME, caCertificate); | ||
// add CA certificate to the custom keystore | ||
runProcess("openssl x509 -outform der -in " + ROOT_CERTIFICARE_NAME + " -out " + ROOT_CERTIFICARE_DER_NAME, run); | ||
runProcess("keytool -importcert -alias root-certificate -keystore " + CUSTOM_TRUST_STORE | ||
+ " -file " + ROOT_CERTIFICARE_DER_NAME + " -storepass " + clientKeyPassword + " -noprompt", run); | ||
} | ||
|
||
private static void updateTrustStoreSystemProperty(final String clientKeyPassword) { | ||
System.setProperty("javax.net.ssl.trustStore", CUSTOM_TRUST_STORE); | ||
System.setProperty("javax.net.ssl.trustStorePassword", clientKeyPassword); | ||
} | ||
|
||
private static void createCertificateFile(String fileName, String fileValue) throws IOException { | ||
try (final PrintWriter out = new PrintWriter(fileName, StandardCharsets.UTF_8)) { | ||
out.print(fileValue); | ||
} | ||
} | ||
|
||
private static void runProcess(final String cmd, final Runtime run) throws IOException, InterruptedException { | ||
final Process pr = run.exec(cmd); | ||
if (!pr.waitFor(30, TimeUnit.SECONDS)) { | ||
pr.destroy(); | ||
throw new RuntimeException("Timeout while executing: " + cmd); | ||
} | ||
} | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.