From 4d751f47f5361c060ac19c0258210e7cddfd15ee Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 15 Feb 2022 12:49:11 -0800 Subject: [PATCH 01/29] pass through jdbc params --- .../mssql/MSSQLDestinationTest.java | 20 +++++++ .../destination/mysql/MySQLDestination.java | 6 ++ .../src/main/resources/spec.json | 7 +++ .../mysql/MySQLDestinationTest.java | 60 +++++++++++++++++++ 4 files changed, 93 insertions(+) create mode 100644 airbyte-integrations/connectors/destination-mssql/src/test/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationTest.java create mode 100644 airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java diff --git a/airbyte-integrations/connectors/destination-mssql/src/test/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationTest.java b/airbyte-integrations/connectors/destination-mssql/src/test/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationTest.java new file mode 100644 index 0000000000000..db2ff80851fca --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql/src/test/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationTest.java @@ -0,0 +1,20 @@ +package io.airbyte.integrations.destination.mssql; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.commons.json.Jsons; +import org.junit.jupiter.api.Test; + +public class MSSQLDestinationTest { + +} diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index 7bc53a2beb549..5000d4a014a97 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -29,6 +29,8 @@ public class MySQLDestination extends AbstractJdbcDestination implements Destina public static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + public static final String ADDITIONAL_PARAMETERS_KEY = "jdbc_url_params"; + public static Destination sshWrappedDestination() { return new SshWrappedDestination(new MySQLDestination(), HOST_KEY, PORT_KEY); } @@ -80,6 +82,10 @@ protected JdbcDatabase getDatabase(final JsonNode config) { public JsonNode toJdbcConfig(final JsonNode config) { final List additionalParameters = new ArrayList<>(); + if (config.has(ADDITIONAL_PARAMETERS_KEY)) { + additionalParameters.add(config.get(ADDITIONAL_PARAMETERS_KEY).asText()); + } + if (!config.has("ssl") || config.get("ssl").asBoolean()) { additionalParameters.add("useSSL=true"); additionalParameters.add("requireSSL=true"); diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json index e705623e1f89f..e774c83e76679 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json @@ -52,6 +52,13 @@ "type": "boolean", "default": true, "order": 5 + }, + "jdbc_url_params": { + "description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).", + "title": "JDBC URL Params", + "type": "string", + "default": "", + "order": 6 } } } diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java new file mode 100644 index 0000000000000..69755cf9acf56 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -0,0 +1,60 @@ +package io.airbyte.integrations.destination.mysql; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.spy; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.jackson.MoreMappers; +import io.airbyte.commons.json.Jsons; +import org.junit.jupiter.api.Test; + +public class MySQLDestinationTest { + + private static final ObjectMapper mapper = MoreMappers.initMapper(); + + + private MySQLDestination getDestination() { + final MySQLDestination result = spy(MySQLDestination.class); + //doReturn(destinationPath).when(result).getDestinationPath(any()); + return result; + } + + private JsonNode buildConfigNoExtraParams() { + final JsonNode config = Jsons.jsonNode(ImmutableMap.of( + "host", "localhost", + "port", 1337, + "username", "user", + "database", "db" + )); + return config; + } + + private JsonNode buildConfigWithExtraParams() { + final JsonNode config = Jsons.jsonNode(ImmutableMap.of( + "host", "localhost", + "port", 1337, + "username", "user", + "database", "db", + "jdbc_url_params", "key1=value1&key2=value2&key3=value3" + )); + return config; + } + + @Test + void testNoExtraParams() { + JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoExtraParams()); + String url = jdbcConfig.get("jdbc_url").asText(); + assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&useSSL=true&requireSSL=true&verifyServerCertificate=false&",url); + } + + @Test + void testExtraParams() { + JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraParams()); + String url = jdbcConfig.get("jdbc_url").asText(); + assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&key1=value1&key2=value2&key3=value3&useSSL=true&requireSSL=true&verifyServerCertificate=false&", + url); + } +} From 9b6c5bd83105c746de68864ba6b47954fe3a90aa Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 15 Feb 2022 14:03:54 -0800 Subject: [PATCH 02/29] fail if contains verifyServerCertificate --- .../destination/mysql/MySQLDestination.java | 16 +++++++++-- .../mysql/MySQLDestinationTest.java | 28 +++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index 5000d4a014a97..8b7904524c84c 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -83,10 +83,18 @@ public JsonNode toJdbcConfig(final JsonNode config) { final List additionalParameters = new ArrayList<>(); if (config.has(ADDITIONAL_PARAMETERS_KEY)) { - additionalParameters.add(config.get(ADDITIONAL_PARAMETERS_KEY).asText()); + String additionalParams = config.get(ADDITIONAL_PARAMETERS_KEY).asText(); + + if (useSSL(config)) { + if (additionalParams.contains("verifyServerCertificate=")) { + throw new RuntimeException(); //FIXME + } + } + + additionalParameters.add(additionalParams); } - if (!config.has("ssl") || config.get("ssl").asBoolean()) { + if (useSSL(config)) { additionalParameters.add("useSSL=true"); additionalParameters.add("requireSSL=true"); additionalParameters.add("verifyServerCertificate=false"); @@ -118,6 +126,10 @@ public JsonNode toJdbcConfig(final JsonNode config) { return Jsons.jsonNode(configBuilder.build()); } + private boolean useSSL(JsonNode config) { + return !config.has("ssl") || config.get("ssl").asBoolean(); + } + public static void main(final String[] args) throws Exception { final Destination destination = MySQLDestination.sshWrappedDestination(); LOGGER.info("starting destination: {}", MySQLDestination.class); diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java index 69755cf9acf56..2939a15fcab18 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -1,7 +1,9 @@ package io.airbyte.integrations.destination.mysql; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.spy; import com.fasterxml.jackson.databind.JsonNode; @@ -43,6 +45,17 @@ private JsonNode buildConfigWithExtraParams() { return config; } + private JsonNode buildConfigWithSSLParam() { + final JsonNode config = Jsons.jsonNode(ImmutableMap.of( + "host", "localhost", + "port", 1337, + "username", "user", + "database", "db", + "jdbc_url_params", "verifyServerCertificate=false" + )); + return config; + } + @Test void testNoExtraParams() { JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoExtraParams()); @@ -57,4 +70,19 @@ void testExtraParams() { assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&key1=value1&key2=value2&key3=value3&useSSL=true&requireSSL=true&verifyServerCertificate=false&", url); } + + @Test + void testExtraParamsWithSSLParameter() { + try { + JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithSSLParam()); + String url = jdbcConfig.get("jdbc_url").asText(); + assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&key1=value1&key2=value2&key3=value3&useSSL=true&requireSSL=true&verifyServerCertificate=false&", + url); + //FIXME: why can't I use Test(expected = ?) + assertTrue(false); + } catch (RuntimeException e) { +// pass + } + + } } From c924611445ab5db218d191c650263dd1fab45002 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 15 Feb 2022 14:33:26 -0800 Subject: [PATCH 03/29] do the same for all ssl params --- .../destination/mysql/MySQLDestination.java | 40 +++++++++---- .../mysql/MySQLDestinationTest.java | 56 ++++++++++++------- 2 files changed, 65 insertions(+), 31 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index 8b7904524c84c..f9721de5508e0 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; @@ -18,6 +19,8 @@ import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,20 +87,23 @@ public JsonNode toJdbcConfig(final JsonNode config) { if (config.has(ADDITIONAL_PARAMETERS_KEY)) { String additionalParams = config.get(ADDITIONAL_PARAMETERS_KEY).asText(); - - if (useSSL(config)) { - if (additionalParams.contains("verifyServerCertificate=")) { - throw new RuntimeException(); //FIXME + if (!additionalParams.isEmpty()) { + if (useSSL(config)) { + for (String p: getSSLParameters().keySet()) { + String paramToFilter = String.format("%s=", p); + if (additionalParams.contains(paramToFilter)) { + throw new RuntimeException(); //FIXME + } + } } + additionalParameters.add(additionalParams); } - - additionalParameters.add(additionalParams); } - if (useSSL(config)) { - additionalParameters.add("useSSL=true"); - additionalParameters.add("requireSSL=true"); - additionalParameters.add("verifyServerCertificate=false"); + for (Entry sslParameter: getSSLParameters().entrySet()) { + String param = formatParameter(sslParameter.getKey(), sslParameter.getValue()); + additionalParameters.add(param); + } } final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s", @@ -109,6 +115,7 @@ public JsonNode toJdbcConfig(final JsonNode config) { // and can't // remove zero date values. // since zero dates are placeholders, we convert them to null by default + //FIXME(girard): do we also want to prevent people from overriding zeroDateTimeBehavior? jdbcUrl.append("?zeroDateTimeBehavior=convertToNull"); if (!additionalParameters.isEmpty()) { jdbcUrl.append("&"); @@ -130,6 +137,19 @@ private boolean useSSL(JsonNode config) { return !config.has("ssl") || config.get("ssl").asBoolean(); } + static Map getSSLParameters() { + Builder builder = ImmutableMap.builder(); + return builder + .put("useSSL", "true") + .put("requireSSL", "true") + .put("verifyServerCertificate", "false") + .build(); + } + + static String formatParameter(String key, String value) { + return String.format("%s=%s", key, value); + } + public static void main(final String[] args) throws Exception { final Destination destination = MySQLDestination.sshWrappedDestination(); LOGGER.info("starting destination: {}", MySQLDestination.class); diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java index 2939a15fcab18..a75e8dabcde6d 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -1,26 +1,20 @@ package io.airbyte.integrations.destination.mysql; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNull; + import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.spy; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; -import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.json.Jsons; +import java.util.Map.Entry; import org.junit.jupiter.api.Test; public class MySQLDestinationTest { - private static final ObjectMapper mapper = MoreMappers.initMapper(); - - private MySQLDestination getDestination() { final MySQLDestination result = spy(MySQLDestination.class); - //doReturn(destinationPath).when(result).getDestinationPath(any()); return result; } @@ -34,24 +28,25 @@ private JsonNode buildConfigNoExtraParams() { return config; } - private JsonNode buildConfigWithExtraParams() { + private JsonNode buildConfigWithExtraParam(String extraParam) { final JsonNode config = Jsons.jsonNode(ImmutableMap.of( "host", "localhost", "port", 1337, "username", "user", "database", "db", - "jdbc_url_params", "key1=value1&key2=value2&key3=value3" + "jdbc_url_params", extraParam )); return config; } - private JsonNode buildConfigWithSSLParam() { + private JsonNode buildConfigWithExtraParamWithoutSSL(String extraParam) { final JsonNode config = Jsons.jsonNode(ImmutableMap.of( "host", "localhost", "port", 1337, "username", "user", "database", "db", - "jdbc_url_params", "verifyServerCertificate=false" + "ssl", false, + "jdbc_url_params", extraParam )); return config; } @@ -63,9 +58,17 @@ void testNoExtraParams() { assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&useSSL=true&requireSSL=true&verifyServerCertificate=false&",url); } + @Test + void testEmptyExtraParams() { + JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraParam("")); + String url = jdbcConfig.get("jdbc_url").asText(); + assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&useSSL=true&requireSSL=true&verifyServerCertificate=false&",url); + } + @Test void testExtraParams() { - JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraParams()); + String extraParam = "key1=value1&key2=value2&key3=value3"; + JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraParam(extraParam)); String url = jdbcConfig.get("jdbc_url").asText(); assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&key1=value1&key2=value2&key3=value3&useSSL=true&requireSSL=true&verifyServerCertificate=false&", url); @@ -73,16 +76,27 @@ void testExtraParams() { @Test void testExtraParamsWithSSLParameter() { - try { - JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithSSLParam()); + for (Entry entry: MySQLDestination.getSSLParameters().entrySet()) { + String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); + try { + getDestination().toJdbcConfig(buildConfigWithExtraParam(extraParam)); + //FIXME: why can't I use Test(expected = ?) + assertTrue(false); + } catch (RuntimeException e) { + // pass + } + } + } + + @Test + void testExtraParamsWithSSLParameterButNoSSL() { + for (Entry entry: MySQLDestination.getSSLParameters().entrySet()) { + String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); + JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraParamWithoutSSL(extraParam)); String url = jdbcConfig.get("jdbc_url").asText(); - assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&key1=value1&key2=value2&key3=value3&useSSL=true&requireSSL=true&verifyServerCertificate=false&", + String expected = String.format("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&%s&", extraParam); + assertEquals(expected, url); - //FIXME: why can't I use Test(expected = ?) - assertTrue(false); - } catch (RuntimeException e) { -// pass } - } } From c959dd8ffa3c96078b25f9be3a9ec2953b9e6bb3 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 15 Feb 2022 14:35:42 -0800 Subject: [PATCH 04/29] delete dead file --- .../mssql/MSSQLDestinationTest.java | 20 ------------------- 1 file changed, 20 deletions(-) delete mode 100644 airbyte-integrations/connectors/destination-mssql/src/test/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationTest.java diff --git a/airbyte-integrations/connectors/destination-mssql/src/test/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationTest.java b/airbyte-integrations/connectors/destination-mssql/src/test/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationTest.java deleted file mode 100644 index db2ff80851fca..0000000000000 --- a/airbyte-integrations/connectors/destination-mssql/src/test/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationTest.java +++ /dev/null @@ -1,20 +0,0 @@ -package io.airbyte.integrations.destination.mssql; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableMap; -import io.airbyte.commons.jackson.MoreMappers; -import io.airbyte.commons.json.Jsons; -import org.junit.jupiter.api.Test; - -public class MSSQLDestinationTest { - -} From 4b7d8fe7686514f6893aa8632f43969d5fdd1fcf Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 15 Feb 2022 14:49:18 -0800 Subject: [PATCH 05/29] slight refactor --- .../destination/mysql/MySQLDestination.java | 79 +++++++++++-------- .../mysql/MySQLDestinationTest.java | 52 ++++++------ 2 files changed, 72 insertions(+), 59 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index f9721de5508e0..787e75f191ace 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -32,7 +32,7 @@ public class MySQLDestination extends AbstractJdbcDestination implements Destina public static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; - public static final String ADDITIONAL_PARAMETERS_KEY = "jdbc_url_params"; + public static final String JDBC_URL_PARAMS_KEY = "jdbc_url_params"; public static Destination sshWrappedDestination() { return new SshWrappedDestination(new MySQLDestination(), HOST_KEY, PORT_KEY); @@ -83,28 +83,7 @@ protected JdbcDatabase getDatabase(final JsonNode config) { @Override public JsonNode toJdbcConfig(final JsonNode config) { - final List additionalParameters = new ArrayList<>(); - - if (config.has(ADDITIONAL_PARAMETERS_KEY)) { - String additionalParams = config.get(ADDITIONAL_PARAMETERS_KEY).asText(); - if (!additionalParams.isEmpty()) { - if (useSSL(config)) { - for (String p: getSSLParameters().keySet()) { - String paramToFilter = String.format("%s=", p); - if (additionalParams.contains(paramToFilter)) { - throw new RuntimeException(); //FIXME - } - } - } - additionalParameters.add(additionalParams); - } - } - if (useSSL(config)) { - for (Entry sslParameter: getSSLParameters().entrySet()) { - String param = formatParameter(sslParameter.getKey(), sslParameter.getValue()); - additionalParameters.add(param); - } - } + final List additionalParameters = getAdditionalParameters(config); final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s", config.get("host").asText(), @@ -115,7 +94,7 @@ public JsonNode toJdbcConfig(final JsonNode config) { // and can't // remove zero date values. // since zero dates are placeholders, we convert them to null by default - //FIXME(girard): do we also want to prevent people from overriding zeroDateTimeBehavior? + //FIXME(girarda): do we also want to prevent people from overriding zeroDateTimeBehavior? jdbcUrl.append("?zeroDateTimeBehavior=convertToNull"); if (!additionalParameters.isEmpty()) { jdbcUrl.append("&"); @@ -133,20 +112,54 @@ public JsonNode toJdbcConfig(final JsonNode config) { return Jsons.jsonNode(configBuilder.build()); } - private boolean useSSL(JsonNode config) { + private List getAdditionalParameters(final JsonNode config) { + final List additionalParameters = new ArrayList<>(); + addJDBCUrlParameters(config, additionalParameters); + if (useSSL(config)) { + addSSLJdbcParameters(additionalParameters); + } + return additionalParameters; + } + + private void addJDBCUrlParameters(final JsonNode config, final List additionalParameters) { + if (config.has(JDBC_URL_PARAMS_KEY)) { + final String additionalParams = config.get(JDBC_URL_PARAMS_KEY).asText(); + if (additionalParams.isBlank()) { + return; + } + if (useSSL(config)) { + for (final String p : getSSLParameters().keySet()) { + final String paramToFilter = String.format("%s=", p); + if (additionalParams.contains(paramToFilter)) { + throw new RuntimeException(); //FIXME + } + } + } + additionalParameters.add(additionalParams); + } + } + + private void addSSLJdbcParameters(final List additionalParameters) { + for (final Entry sslParameter : getSSLParameters().entrySet()) { + final String param = formatParameter(sslParameter.getKey(), sslParameter.getValue()); + additionalParameters.add(param); + } + } + + private boolean useSSL(final JsonNode config) { return !config.has("ssl") || config.get("ssl").asBoolean(); } static Map getSSLParameters() { - Builder builder = ImmutableMap.builder(); - return builder - .put("useSSL", "true") - .put("requireSSL", "true") - .put("verifyServerCertificate", "false") - .build(); + final Builder builder = ImmutableMap.builder(); + return builder + .put("useSSL", "true") + .put("requireSSL", "true") + .put("verifyServerCertificate", "false") + .build(); } - - static String formatParameter(String key, String value) { + + static String formatParameter(final String key, final String value) { return String.format("%s=%s", key, value); } diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java index a75e8dabcde6d..d2a8e27dbba00 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -1,7 +1,6 @@ package io.airbyte.integrations.destination.mysql; import static org.junit.jupiter.api.Assertions.assertEquals; - import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.spy; @@ -28,7 +27,7 @@ private JsonNode buildConfigNoExtraParams() { return config; } - private JsonNode buildConfigWithExtraParam(String extraParam) { + private JsonNode buildConfigWithExtraParam(final String extraParam) { final JsonNode config = Jsons.jsonNode(ImmutableMap.of( "host", "localhost", "port", 1337, @@ -39,7 +38,7 @@ private JsonNode buildConfigWithExtraParam(String extraParam) { return config; } - private JsonNode buildConfigWithExtraParamWithoutSSL(String extraParam) { + private JsonNode buildConfigWithExtraParamWithoutSSL(final String extraParam) { final JsonNode config = Jsons.jsonNode(ImmutableMap.of( "host", "localhost", "port", 1337, @@ -52,49 +51,50 @@ private JsonNode buildConfigWithExtraParamWithoutSSL(String extraParam) { } @Test - void testNoExtraParams() { - JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoExtraParams()); - String url = jdbcConfig.get("jdbc_url").asText(); - assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&useSSL=true&requireSSL=true&verifyServerCertificate=false&",url); + void testNoExtraParams() { + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoExtraParams()); + final String url = jdbcConfig.get("jdbc_url").asText(); + assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&useSSL=true&requireSSL=true&verifyServerCertificate=false&", url); } @Test - void testEmptyExtraParams() { - JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraParam("")); - String url = jdbcConfig.get("jdbc_url").asText(); - assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&useSSL=true&requireSSL=true&verifyServerCertificate=false&",url); + void testEmptyExtraParams() { + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraParam("")); + final String url = jdbcConfig.get("jdbc_url").asText(); + assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&useSSL=true&requireSSL=true&verifyServerCertificate=false&", url); } @Test - void testExtraParams() { - String extraParam = "key1=value1&key2=value2&key3=value3"; - JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraParam(extraParam)); - String url = jdbcConfig.get("jdbc_url").asText(); - assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&key1=value1&key2=value2&key3=value3&useSSL=true&requireSSL=true&verifyServerCertificate=false&", + void testExtraParams() { + final String extraParam = "key1=value1&key2=value2&key3=value3"; + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraParam(extraParam)); + final String url = jdbcConfig.get("jdbc_url").asText(); + assertEquals( + "jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&key1=value1&key2=value2&key3=value3&useSSL=true&requireSSL=true&verifyServerCertificate=false&", url); } @Test - void testExtraParamsWithSSLParameter() { - for (Entry entry: MySQLDestination.getSSLParameters().entrySet()) { - String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); + void testExtraParamsWithSSLParameter() { + for (final Entry entry : MySQLDestination.getSSLParameters().entrySet()) { + final String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); try { getDestination().toJdbcConfig(buildConfigWithExtraParam(extraParam)); //FIXME: why can't I use Test(expected = ?) assertTrue(false); - } catch (RuntimeException e) { + } catch (final RuntimeException e) { // pass } } } @Test - void testExtraParamsWithSSLParameterButNoSSL() { - for (Entry entry: MySQLDestination.getSSLParameters().entrySet()) { - String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); - JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraParamWithoutSSL(extraParam)); - String url = jdbcConfig.get("jdbc_url").asText(); - String expected = String.format("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&%s&", extraParam); + void testExtraParamsWithSSLParameterButNoSSL() { + for (final Entry entry : MySQLDestination.getSSLParameters().entrySet()) { + final String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraParamWithoutSSL(extraParam)); + final String url = jdbcConfig.get("jdbc_url").asText(); + final String expected = String.format("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&%s&", extraParam); assertEquals(expected, url); } From e401db1b3252c3aa0b254cc61273c0e5e4383904 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 15 Feb 2022 14:55:55 -0800 Subject: [PATCH 06/29] new method --- .../integrations/destination/mysql/MySQLDestination.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index 787e75f191ace..7aa14baf2a0aa 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -129,8 +129,7 @@ private void addJDBCUrlParameters(final JsonNode config, final List addi } if (useSSL(config)) { for (final String p : getSSLParameters().keySet()) { - final String paramToFilter = String.format("%s=", p); - if (additionalParams.contains(paramToFilter)) { + if (containsParameterKey(additionalParams, p)) { throw new RuntimeException(); //FIXME } } @@ -139,6 +138,11 @@ private void addJDBCUrlParameters(final JsonNode config, final List addi } } + private boolean containsParameterKey(final String additionalParametersString, final String key) { + final String s = String.format("%s=", key); + return additionalParametersString.contains(s); + } + private void addSSLJdbcParameters(final List additionalParameters) { for (final Entry sslParameter : getSSLParameters().entrySet()) { final String param = formatParameter(sslParameter.getKey(), sslParameter.getValue()); From 76d8595143c621b9b519e7112b9ac7856d019498 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 15 Feb 2022 15:08:00 -0800 Subject: [PATCH 07/29] remove default value --- .../src/main/resources/spec.json | 17 ++++++++--- .../mysql/MySQLDestinationTest.java | 28 ++++++++----------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json index e774c83e76679..212dd541d84a7 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json @@ -3,12 +3,20 @@ "supportsIncremental": true, "supportsNormalization": true, "supportsDBT": true, - "supported_destination_sync_modes": ["overwrite", "append"], + "supported_destination_sync_modes": [ + "overwrite", + "append" + ], "connectionSpecification": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "MySQL Destination Spec", "type": "object", - "required": ["host", "port", "username", "database"], + "required": [ + "host", + "port", + "username", + "database" + ], "additionalProperties": true, "properties": { "host": { @@ -24,7 +32,9 @@ "minimum": 0, "maximum": 65536, "default": 3306, - "examples": ["3306"], + "examples": [ + "3306" + ], "order": 1 }, "database": { @@ -57,7 +67,6 @@ "description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).", "title": "JDBC URL Params", "type": "string", - "default": "", "order": 6 } } diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java index d2a8e27dbba00..ad0ef462a90f0 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -1,7 +1,7 @@ package io.airbyte.integrations.destination.mysql; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.spy; import com.fasterxml.jackson.databind.JsonNode; @@ -17,7 +17,7 @@ private MySQLDestination getDestination() { return result; } - private JsonNode buildConfigNoExtraParams() { + private JsonNode buildConfigNoJDBCParameters() { final JsonNode config = Jsons.jsonNode(ImmutableMap.of( "host", "localhost", "port", 1337, @@ -27,7 +27,7 @@ private JsonNode buildConfigNoExtraParams() { return config; } - private JsonNode buildConfigWithExtraParam(final String extraParam) { + private JsonNode buildConfigWithExtraJDBCParameters(final String extraParam) { final JsonNode config = Jsons.jsonNode(ImmutableMap.of( "host", "localhost", "port", 1337, @@ -38,7 +38,7 @@ private JsonNode buildConfigWithExtraParam(final String extraParam) { return config; } - private JsonNode buildConfigWithExtraParamWithoutSSL(final String extraParam) { + private JsonNode buildConfigWithExtraJDBCParametersNoSSL(final String extraParam) { final JsonNode config = Jsons.jsonNode(ImmutableMap.of( "host", "localhost", "port", 1337, @@ -52,14 +52,14 @@ private JsonNode buildConfigWithExtraParamWithoutSSL(final String extraParam) { @Test void testNoExtraParams() { - final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoExtraParams()); + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoJDBCParameters()); final String url = jdbcConfig.get("jdbc_url").asText(); assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&useSSL=true&requireSSL=true&verifyServerCertificate=false&", url); } @Test void testEmptyExtraParams() { - final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraParam("")); + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters("")); final String url = jdbcConfig.get("jdbc_url").asText(); assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&useSSL=true&requireSSL=true&verifyServerCertificate=false&", url); } @@ -67,7 +67,7 @@ void testEmptyExtraParams() { @Test void testExtraParams() { final String extraParam = "key1=value1&key2=value2&key3=value3"; - final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraParam(extraParam)); + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters(extraParam)); final String url = jdbcConfig.get("jdbc_url").asText(); assertEquals( "jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&key1=value1&key2=value2&key3=value3&useSSL=true&requireSSL=true&verifyServerCertificate=false&", @@ -78,21 +78,17 @@ void testExtraParams() { void testExtraParamsWithSSLParameter() { for (final Entry entry : MySQLDestination.getSSLParameters().entrySet()) { final String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); - try { - getDestination().toJdbcConfig(buildConfigWithExtraParam(extraParam)); - //FIXME: why can't I use Test(expected = ?) - assertTrue(false); - } catch (final RuntimeException e) { - // pass - } + assertThrows(RuntimeException.class, () -> + getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters(extraParam)) + ); } } @Test - void testExtraParamsWithSSLParameterButNoSSL() { + void testExtraParamsWithSSLParameterNoSSL() { for (final Entry entry : MySQLDestination.getSSLParameters().entrySet()) { final String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); - final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraParamWithoutSSL(extraParam)); + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJDBCParametersNoSSL(extraParam)); final String url = jdbcConfig.get("jdbc_url").asText(); final String expected = String.format("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&%s&", extraParam); assertEquals(expected, From 4e592ed78695fe4f9210b1f2ed5cc9ca6a3a7df9 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 15 Feb 2022 15:12:10 -0800 Subject: [PATCH 08/29] error message --- .../integrations/destination/mysql/MySQLDestination.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index 7aa14baf2a0aa..7c1f8d5f2b887 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -130,7 +130,7 @@ private void addJDBCUrlParameters(final JsonNode config, final List addi if (useSSL(config)) { for (final String p : getSSLParameters().keySet()) { if (containsParameterKey(additionalParams, p)) { - throw new RuntimeException(); //FIXME + throw new RuntimeException("Cannot overwrite JDBC parameter " + p); } } } From c90021bbaeb320c3eff0e93f0bf34adb905f6ea2 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 15 Feb 2022 15:15:47 -0800 Subject: [PATCH 09/29] rename --- .../integrations/destination/mysql/MySQLDestination.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index 7c1f8d5f2b887..aed3d41cb28d4 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -116,7 +116,7 @@ private List getAdditionalParameters(final JsonNode config) { final List additionalParameters = new ArrayList<>(); addJDBCUrlParameters(config, additionalParameters); if (useSSL(config)) { - addSSLJdbcParameters(additionalParameters); + addSSLJDBCParameters(additionalParameters); } return additionalParameters; } @@ -143,7 +143,7 @@ private boolean containsParameterKey(final String additionalParametersString, fi return additionalParametersString.contains(s); } - private void addSSLJdbcParameters(final List additionalParameters) { + private void addSSLJDBCParameters(final List additionalParameters) { for (final Entry sslParameter : getSSLParameters().entrySet()) { final String param = formatParameter(sslParameter.getKey(), sslParameter.getValue()); additionalParameters.add(param); From 51b13aa8564d32d53425991ce48547c898695a3a Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 15 Feb 2022 17:25:19 -0800 Subject: [PATCH 10/29] update as per comments --- .../destination/mysql/MySQLDestination.java | 88 ++++++++++--------- .../mysql/MySQLDestinationTest.java | 19 ++-- 2 files changed, 53 insertions(+), 54 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index aed3d41cb28d4..be4f30014550c 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -6,7 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; +import com.google.common.collect.Streams; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; @@ -17,10 +17,13 @@ import io.airbyte.integrations.destination.mysql.MySQLSqlOperations.VersionCompatibility; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; -import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +37,15 @@ public class MySQLDestination extends AbstractJdbcDestination implements Destina public static final String JDBC_URL_PARAMS_KEY = "jdbc_url_params"; + static final Map SSL_JDBC_PARAMETERS = ImmutableMap.of( + "useSSL", "true", + "requireSSL", "true", + "verifyServerCertificate", "false" + ); + static final Map DEFAULT_JDBC_PARAMETERS = ImmutableMap.of( + "zeroDateTimeBehavior", "convertToNull" + ); + public static Destination sshWrappedDestination() { return new SshWrappedDestination(new MySQLDestination(), HOST_KEY, PORT_KEY); } @@ -94,10 +106,8 @@ public JsonNode toJdbcConfig(final JsonNode config) { // and can't // remove zero date values. // since zero dates are placeholders, we convert them to null by default - //FIXME(girarda): do we also want to prevent people from overriding zeroDateTimeBehavior? - jdbcUrl.append("?zeroDateTimeBehavior=convertToNull"); if (!additionalParameters.isEmpty()) { - jdbcUrl.append("&"); + jdbcUrl.append("?"); additionalParameters.forEach(x -> jdbcUrl.append(x).append("&")); } @@ -113,56 +123,50 @@ public JsonNode toJdbcConfig(final JsonNode config) { } private List getAdditionalParameters(final JsonNode config) { - final List additionalParameters = new ArrayList<>(); - addJDBCUrlParameters(config, additionalParameters); + final Map customParameters = getCustomJdbcParameters(config); + if (useSSL(config)) { - addSSLJDBCParameters(additionalParameters); + return convertToJdbcStrings(customParameters, List.of(DEFAULT_JDBC_PARAMETERS, SSL_JDBC_PARAMETERS)); + } else { + return convertToJdbcStrings(customParameters, List.of(DEFAULT_JDBC_PARAMETERS)); } - return additionalParameters; } - private void addJDBCUrlParameters(final JsonNode config, final List additionalParameters) { - if (config.has(JDBC_URL_PARAMS_KEY)) { - final String additionalParams = config.get(JDBC_URL_PARAMS_KEY).asText(); - if (additionalParams.isBlank()) { - return; - } - if (useSSL(config)) { - for (final String p : getSSLParameters().keySet()) { - if (containsParameterKey(additionalParams, p)) { - throw new RuntimeException("Cannot overwrite JDBC parameter " + p); - } - } - } - additionalParameters.add(additionalParams); + private List convertToJdbcStrings(final Map customParameters, final List> maps) { + final Set keys = maps.stream() + .map(Map::keySet) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + final boolean hasDuplicateKeys = keys.stream().anyMatch(customParameters::containsKey); + if (hasDuplicateKeys) { + throw new RuntimeException(); // TODO } + return Streams.concat(Stream.of(customParameters), maps.stream()) + .map(Map::entrySet) + .flatMap(Collection::stream) + .map(entry -> formatParameter(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()); } - private boolean containsParameterKey(final String additionalParametersString, final String key) { - final String s = String.format("%s=", key); - return additionalParametersString.contains(s); - } - - private void addSSLJDBCParameters(final List additionalParameters) { - for (final Entry sslParameter : getSSLParameters().entrySet()) { - final String param = formatParameter(sslParameter.getKey(), sslParameter.getValue()); - additionalParameters.add(param); + private Map getCustomJdbcParameters(final JsonNode config) { + final Map parameters = new HashMap<>(); + if (config.has(JDBC_URL_PARAMS_KEY)) { + final String jdbcParams = config.get(JDBC_URL_PARAMS_KEY).asText(); + if (!jdbcParams.isBlank()) { + final String[] keyValuePairs = jdbcParams.split("&"); + for (final String kv : keyValuePairs) { + final String[] split = kv.split("="); + parameters.put(split[0], split[1]); + } + } } + return parameters; } private boolean useSSL(final JsonNode config) { return !config.has("ssl") || config.get("ssl").asBoolean(); } - static Map getSSLParameters() { - final Builder builder = ImmutableMap.builder(); - return builder - .put("useSSL", "true") - .put("requireSSL", "true") - .put("verifyServerCertificate", "false") - .build(); - } - static String formatParameter(final String key, final String value) { return String.format("%s=%s", key, value); } diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java index ad0ef462a90f0..4965be459a479 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -70,29 +70,24 @@ void testExtraParams() { final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters(extraParam)); final String url = jdbcConfig.get("jdbc_url").asText(); assertEquals( - "jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&key1=value1&key2=value2&key3=value3&useSSL=true&requireSSL=true&verifyServerCertificate=false&", + "jdbc:mysql://localhost:1337/db?key1=value1&key2=value2&key3=value3&zeroDateTimeBehavior=convertToNull&useSSL=true&requireSSL=true&verifyServerCertificate=false&", url); } @Test - void testExtraParamsWithSSLParameter() { - for (final Entry entry : MySQLDestination.getSSLParameters().entrySet()) { + void testExtraParamsWithDefaultParameter() { + for (final Entry entry : MySQLDestination.SSL_JDBC_PARAMETERS.entrySet()) { final String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); assertThrows(RuntimeException.class, () -> getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters(extraParam)) ); } - } - @Test - void testExtraParamsWithSSLParameterNoSSL() { - for (final Entry entry : MySQLDestination.getSSLParameters().entrySet()) { + for (final Entry entry : MySQLDestination.DEFAULT_JDBC_PARAMETERS.entrySet()) { final String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); - final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJDBCParametersNoSSL(extraParam)); - final String url = jdbcConfig.get("jdbc_url").asText(); - final String expected = String.format("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&%s&", extraParam); - assertEquals(expected, - url); + assertThrows(RuntimeException.class, () -> + getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters(extraParam)) + ); } } } From 95e1a83613a246da534be014b7266b1018b6bb52 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 15 Feb 2022 17:27:50 -0800 Subject: [PATCH 11/29] Update exception message --- .../integrations/destination/mysql/MySQLDestination.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index be4f30014550c..f81eb856e63e9 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -137,9 +137,9 @@ private List convertToJdbcStrings(final Map customParame .map(Map::keySet) .flatMap(Collection::stream) .collect(Collectors.toSet()); - final boolean hasDuplicateKeys = keys.stream().anyMatch(customParameters::containsKey); - if (hasDuplicateKeys) { - throw new RuntimeException(); // TODO + final List duplicateKeys = keys.stream().filter(customParameters::containsKey).toList(); + if (!duplicateKeys.isEmpty()) { + throw new RuntimeException("Cannot overwrite default JDBC parameter " + duplicateKeys); } return Streams.concat(Stream.of(customParameters), maps.stream()) .map(Map::entrySet) From 607f1019d8687c22a89ee33f8479628e133a67f3 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 15 Feb 2022 18:59:10 -0800 Subject: [PATCH 12/29] Bump version --- airbyte-integrations/connectors/destination-mysql/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-mysql/Dockerfile b/airbyte-integrations/connectors/destination-mysql/Dockerfile index ac835aba62d67..b137bdd41e32b 100644 --- a/airbyte-integrations/connectors/destination-mysql/Dockerfile +++ b/airbyte-integrations/connectors/destination-mysql/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-mysql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.16 +LABEL io.airbyte.version=0.1.17 LABEL io.airbyte.name=airbyte/destination-mysql From c55002826dfe081f0a77d4db1fc40ab01adc8633 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 08:18:28 -0800 Subject: [PATCH 13/29] extract to method --- .../destination/mysql/MySQLDestination.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index f81eb856e63e9..0f5fdce095cb5 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -132,20 +132,26 @@ private List getAdditionalParameters(final JsonNode config) { } } - private List convertToJdbcStrings(final Map customParameters, final List> maps) { - final Set keys = maps.stream() + private List convertToJdbcStrings(final Map customParameters, final List> defaultParametersMaps) { + assertCustomParametersDontOverwriteDefaultParameters(customParameters, defaultParametersMaps); + return Streams.concat(Stream.of(customParameters), defaultParametersMaps.stream()) + .map(Map::entrySet) + .flatMap(Collection::stream) + .map(entry -> formatParameter(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()); + } + + private void assertCustomParametersDontOverwriteDefaultParameters(final Map customParameters, + final List> defaultParametersMaps) { + final Set keys = defaultParametersMaps.stream() .map(Map::keySet) .flatMap(Collection::stream) .collect(Collectors.toSet()); + final List duplicateKeys = keys.stream().filter(customParameters::containsKey).toList(); if (!duplicateKeys.isEmpty()) { throw new RuntimeException("Cannot overwrite default JDBC parameter " + duplicateKeys); } - return Streams.concat(Stream.of(customParameters), maps.stream()) - .map(Map::entrySet) - .flatMap(Collection::stream) - .map(entry -> formatParameter(entry.getKey(), entry.getValue())) - .collect(Collectors.toList()); } private Map getCustomJdbcParameters(final JsonNode config) { From 097906fe64f81f03f80d7a2e16ee295e06e7e6e7 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 13:12:02 -0800 Subject: [PATCH 14/29] Update doc --- docs/integrations/destinations/mysql.md | 98 ++++++++++++++++++------- 1 file changed, 72 insertions(+), 26 deletions(-) diff --git a/docs/integrations/destinations/mysql.md b/docs/integrations/destinations/mysql.md index b85993fd8b7ba..35c17a3e25473 100644 --- a/docs/integrations/destinations/mysql.md +++ b/docs/integrations/destinations/mysql.md @@ -1,9 +1,10 @@ # MySQL There are two flavors of connectors for this destination: -1. destination-mysql connector. Supports both SSL and non SSL connections. -2. destination-mysql-strict-encrypt connector. Pretty same as connector above, but supports SSL connections only. +1. destination-mysql connector. Supports both SSL and non SSL connections. +2. destination-mysql-strict-encrypt connector. Pretty same as connector above, but supports SSL + connections only. ## Features @@ -19,13 +20,16 @@ There are two flavors of connectors for this destination: Each stream will be output into its own table in MySQL. Each table will contain 3 columns: -* `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. The column type in MySQL is `VARCHAR(256)`. -* `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. The column type in MySQL is `TIMESTAMP(6)`. +* `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. The column type in + MySQL is `VARCHAR(256)`. +* `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. + The column type in MySQL is `TIMESTAMP(6)`. * `_airbyte_data`: a json blob representing with the event data. The column type in MySQL is `JSON`. ## Getting Started \(Airbyte Cloud\) -Airbyte Cloud only supports connecting to your MySQL instance with TLS encryption. Other than that, you can proceed with the open-source instructions below. +Airbyte Cloud only supports connecting to your MySQL instance with TLS encryption. Other than that, +you can proceed with the open-source instructions below. ## Getting Started \(Airbyte Open-Source\) @@ -38,31 +42,54 @@ To use the MySQL destination, you'll need: #### Network Access -Make sure your MySQL database can be accessed by Airbyte. If your database is within a VPC, you may need to allow access from the IP you're using to expose Airbyte. +Make sure your MySQL database can be accessed by Airbyte. If your database is within a VPC, you may +need to allow access from the IP you're using to expose Airbyte. #### **Permissions** -You need a MySQL user with `CREATE, INSERT, SELECT, DROP` permissions. We highly recommend creating an Airbyte-specific user for this purpose. +You need a MySQL user with `CREATE, INSERT, SELECT, DROP` permissions. We highly recommend creating +an Airbyte-specific user for this purpose. #### Target Database -MySQL doesn't differentiate between a database and schema. A database is essentially a schema where all the tables live in. You will need to choose an existing database or create a new database. This will act as a default database/schema where the tables will be created if the source doesn't provide a namespace. +MySQL doesn't differentiate between a database and schema. A database is essentially a schema where +all the tables live in. You will need to choose an existing database or create a new database. This +will act as a default database/schema where the tables will be created if the source doesn't provide +a namespace. ### Setup the MySQL destination in Airbyte -Before setting up MySQL destination in Airbyte, you need to set the [local\_infile](https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_local_infile) system variable to true. You can do this by running the query `SET GLOBAL local_infile = true` with a user with [SYSTEM\_VARIABLES\_ADMIN](https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_system-variables-admin) permission. This is required cause Airbyte uses `LOAD DATA LOCAL INFILE` to load data into table. +Before setting up MySQL destination in Airbyte, you need to set +the [local\_infile](https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_local_infile) +system variable to true. You can do this by running the query `SET GLOBAL local_infile = true` with +a user +with [SYSTEM\_VARIABLES\_ADMIN](https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_system-variables-admin) +permission. This is required cause Airbyte uses `LOAD DATA LOCAL INFILE` to load data into table. -You should now have all the requirements needed to configure MySQL as a destination in the UI. You'll need the following information to configure the MySQL destination: +You should now have all the requirements needed to configure MySQL as a destination in the UI. +You'll need the following information to configure the MySQL destination: * **Host** * **Port** * **Username** * **Password** * **Database** +* **jdbc_url_params** (Optional) + +### Default JDBC URL Parameters + +The following JDBC URL parameters are set by Airbyte and cannot be overridden by +the `jdbc_url_params` field: + +* useSSL +* requireSSL +* verifyServerCertificate +* zeroDateTimeBehavior ## Known Limitations -Note that MySQL documentation discusses identifiers case sensitivity using the `lower_case_table_names` system variable. One of their recommendations is: +Note that MySQL documentation discusses identifiers case sensitivity using +the `lower_case_table_names` system variable. One of their recommendations is: ```text "It is best to adopt a consistent convention, such as always creating and referring to databases and tables using lowercase names. @@ -71,30 +98,50 @@ Note that MySQL documentation discusses identifiers case sensitivity using the ` [Source: MySQL docs](https://dev.mysql.com/doc/refman/8.0/en/identifier-case-sensitivity.html) -As a result, Airbyte MySQL destination forces all identifier \(table, schema and columns\) names to be lowercase. +As a result, Airbyte MySQL destination forces all identifier \(table, schema and columns\) names to +be lowercase. ## Connection via SSH Tunnel -Airbyte has the ability to connect to a MySQl instance via an SSH Tunnel. The reason you might want to do this because it is not possible \(or against security policy\) to connect to the database directly \(e.g. it does not have a public IP address\). +Airbyte has the ability to connect to a MySQl instance via an SSH Tunnel. The reason you might want +to do this because it is not possible \(or against security policy\) to connect to the database +directly \(e.g. it does not have a public IP address\). -When using an SSH tunnel, you are configuring Airbyte to connect to an intermediate server \(a.k.a. a bastion sever\) that _does_ have direct access to the database. Airbyte connects to the bastion and then asks the bastion to connect directly to the server. +When using an SSH tunnel, you are configuring Airbyte to connect to an intermediate server \(a.k.a. +a bastion sever\) that _does_ have direct access to the database. Airbyte connects to the bastion +and then asks the bastion to connect directly to the server. -Using this feature requires additional configuration, when creating the destination. We will talk through what each piece of configuration means. +Using this feature requires additional configuration, when creating the destination. We will talk +through what each piece of configuration means. 1. Configure all fields for the destination as you normally would, except `SSH Tunnel Method`. -2. `SSH Tunnel Method` defaults to `No Tunnel` \(meaning a direct connection\). If you want to use an SSH Tunnel choose `SSH Key Authentication` or `Password Authentication`. - 1. Choose `Key Authentication` if you will be using an RSA private key as your secret for establishing the SSH Tunnel \(see below for more information on generating this key\). - 2. Choose `Password Authentication` if you will be using a password as your secret for establishing the SSH Tunnel. -3. `SSH Tunnel Jump Server Host` refers to the intermediate \(bastion\) server that Airbyte will connect to. This should be a hostname or an IP Address. -4. `SSH Connection Port` is the port on the bastion server with which to make the SSH connection. The default port for SSH connections is `22`, so unless you have explicitly changed something, go with the default. -5. `SSH Login Username` is the username that Airbyte should use when connection to the bastion server. This is NOT the MySQl username. -6. If you are using `Password Authentication`, then `SSH Login Username` should be set to the password of the User from the previous step. If you are using `SSH Key Authentication` leave this blank. Again, this is not the MySQl password, but the password for the OS-user that Airbyte is using to perform commands on the bastion. -7. If you are using `SSH Key Authentication`, then `SSH Private Key` should be set to the RSA Private Key that you are using to create the SSH connection. This should be the full contents of the key file starting with `-----BEGIN RSA PRIVATE KEY-----` and ending with `-----END RSA PRIVATE KEY-----`. +2. `SSH Tunnel Method` defaults to `No Tunnel` \(meaning a direct connection\). If you want to use + an SSH Tunnel choose `SSH Key Authentication` or `Password Authentication`. + 1. Choose `Key Authentication` if you will be using an RSA private key as your secret for + establishing the SSH Tunnel \(see below for more information on generating this key\). + 2. Choose `Password Authentication` if you will be using a password as your secret for + establishing the SSH Tunnel. +3. `SSH Tunnel Jump Server Host` refers to the intermediate \(bastion\) server that Airbyte will + connect to. This should be a hostname or an IP Address. +4. `SSH Connection Port` is the port on the bastion server with which to make the SSH connection. + The default port for SSH connections is `22`, so unless you have explicitly changed something, go + with the default. +5. `SSH Login Username` is the username that Airbyte should use when connection to the bastion + server. This is NOT the MySQl username. +6. If you are using `Password Authentication`, then `SSH Login Username` should be set to the + password of the User from the previous step. If you are using `SSH Key Authentication` leave this + blank. Again, this is not the MySQl password, but the password for the OS-user that Airbyte is + using to perform commands on the bastion. +7. If you are using `SSH Key Authentication`, then `SSH Private Key` should be set to the RSA + Private Key that you are using to create the SSH connection. This should be the full contents of + the key file starting with `-----BEGIN RSA PRIVATE KEY-----` and ending + with `-----END RSA PRIVATE KEY-----`. ## CHANGELOG -| Version | Date | Pull Request | Subject | -|:--------| :--- | :--- | :--- | +| Version | Date | Pull Request | Subject | +|:--------|:-----------| :--- | :--- | +| 0.1.17 | 2022-02-16 | [10362](https://github.com/airbytehq/airbyte/pull/10362) | Add jdbc_url_params support for optional JDBC parameters | | 0.1.16 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option | | 0.1.15 | 2021-12-01 | [8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key | | 0.1.14 | 2021-11-08 | [#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count | @@ -111,7 +158,6 @@ Using this feature requires additional configuration, when creating the destinat | 0.1.1 | 2021-07-03 | [\#3289](https://github.com/airbytehq/airbyte/pull/3289) | Added support for outputting messages. | | 0.1.0 | 2021-05-06 | [\#3242](https://github.com/airbytehq/airbyte/pull/3242) | Added MySQL destination. | - ## CHANGELOG destination-mysql-strict-encrypt | Version | Date | Pull Request | Subject | From 0e9e8ea0a960a8618acfc0a931e7a0b098450779 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 13:14:18 -0800 Subject: [PATCH 15/29] Revert "Update doc" This reverts commit 097906fe64f81f03f80d7a2e16ee295e06e7e6e7. --- docs/integrations/destinations/mysql.md | 98 +++++++------------------ 1 file changed, 26 insertions(+), 72 deletions(-) diff --git a/docs/integrations/destinations/mysql.md b/docs/integrations/destinations/mysql.md index 35c17a3e25473..b85993fd8b7ba 100644 --- a/docs/integrations/destinations/mysql.md +++ b/docs/integrations/destinations/mysql.md @@ -1,10 +1,9 @@ # MySQL There are two flavors of connectors for this destination: - 1. destination-mysql connector. Supports both SSL and non SSL connections. -2. destination-mysql-strict-encrypt connector. Pretty same as connector above, but supports SSL - connections only. +2. destination-mysql-strict-encrypt connector. Pretty same as connector above, but supports SSL connections only. + ## Features @@ -20,16 +19,13 @@ There are two flavors of connectors for this destination: Each stream will be output into its own table in MySQL. Each table will contain 3 columns: -* `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. The column type in - MySQL is `VARCHAR(256)`. -* `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. - The column type in MySQL is `TIMESTAMP(6)`. +* `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. The column type in MySQL is `VARCHAR(256)`. +* `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. The column type in MySQL is `TIMESTAMP(6)`. * `_airbyte_data`: a json blob representing with the event data. The column type in MySQL is `JSON`. ## Getting Started \(Airbyte Cloud\) -Airbyte Cloud only supports connecting to your MySQL instance with TLS encryption. Other than that, -you can proceed with the open-source instructions below. +Airbyte Cloud only supports connecting to your MySQL instance with TLS encryption. Other than that, you can proceed with the open-source instructions below. ## Getting Started \(Airbyte Open-Source\) @@ -42,54 +38,31 @@ To use the MySQL destination, you'll need: #### Network Access -Make sure your MySQL database can be accessed by Airbyte. If your database is within a VPC, you may -need to allow access from the IP you're using to expose Airbyte. +Make sure your MySQL database can be accessed by Airbyte. If your database is within a VPC, you may need to allow access from the IP you're using to expose Airbyte. #### **Permissions** -You need a MySQL user with `CREATE, INSERT, SELECT, DROP` permissions. We highly recommend creating -an Airbyte-specific user for this purpose. +You need a MySQL user with `CREATE, INSERT, SELECT, DROP` permissions. We highly recommend creating an Airbyte-specific user for this purpose. #### Target Database -MySQL doesn't differentiate between a database and schema. A database is essentially a schema where -all the tables live in. You will need to choose an existing database or create a new database. This -will act as a default database/schema where the tables will be created if the source doesn't provide -a namespace. +MySQL doesn't differentiate between a database and schema. A database is essentially a schema where all the tables live in. You will need to choose an existing database or create a new database. This will act as a default database/schema where the tables will be created if the source doesn't provide a namespace. ### Setup the MySQL destination in Airbyte -Before setting up MySQL destination in Airbyte, you need to set -the [local\_infile](https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_local_infile) -system variable to true. You can do this by running the query `SET GLOBAL local_infile = true` with -a user -with [SYSTEM\_VARIABLES\_ADMIN](https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_system-variables-admin) -permission. This is required cause Airbyte uses `LOAD DATA LOCAL INFILE` to load data into table. +Before setting up MySQL destination in Airbyte, you need to set the [local\_infile](https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_local_infile) system variable to true. You can do this by running the query `SET GLOBAL local_infile = true` with a user with [SYSTEM\_VARIABLES\_ADMIN](https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_system-variables-admin) permission. This is required cause Airbyte uses `LOAD DATA LOCAL INFILE` to load data into table. -You should now have all the requirements needed to configure MySQL as a destination in the UI. -You'll need the following information to configure the MySQL destination: +You should now have all the requirements needed to configure MySQL as a destination in the UI. You'll need the following information to configure the MySQL destination: * **Host** * **Port** * **Username** * **Password** * **Database** -* **jdbc_url_params** (Optional) - -### Default JDBC URL Parameters - -The following JDBC URL parameters are set by Airbyte and cannot be overridden by -the `jdbc_url_params` field: - -* useSSL -* requireSSL -* verifyServerCertificate -* zeroDateTimeBehavior ## Known Limitations -Note that MySQL documentation discusses identifiers case sensitivity using -the `lower_case_table_names` system variable. One of their recommendations is: +Note that MySQL documentation discusses identifiers case sensitivity using the `lower_case_table_names` system variable. One of their recommendations is: ```text "It is best to adopt a consistent convention, such as always creating and referring to databases and tables using lowercase names. @@ -98,50 +71,30 @@ the `lower_case_table_names` system variable. One of their recommendations is: [Source: MySQL docs](https://dev.mysql.com/doc/refman/8.0/en/identifier-case-sensitivity.html) -As a result, Airbyte MySQL destination forces all identifier \(table, schema and columns\) names to -be lowercase. +As a result, Airbyte MySQL destination forces all identifier \(table, schema and columns\) names to be lowercase. ## Connection via SSH Tunnel -Airbyte has the ability to connect to a MySQl instance via an SSH Tunnel. The reason you might want -to do this because it is not possible \(or against security policy\) to connect to the database -directly \(e.g. it does not have a public IP address\). +Airbyte has the ability to connect to a MySQl instance via an SSH Tunnel. The reason you might want to do this because it is not possible \(or against security policy\) to connect to the database directly \(e.g. it does not have a public IP address\). -When using an SSH tunnel, you are configuring Airbyte to connect to an intermediate server \(a.k.a. -a bastion sever\) that _does_ have direct access to the database. Airbyte connects to the bastion -and then asks the bastion to connect directly to the server. +When using an SSH tunnel, you are configuring Airbyte to connect to an intermediate server \(a.k.a. a bastion sever\) that _does_ have direct access to the database. Airbyte connects to the bastion and then asks the bastion to connect directly to the server. -Using this feature requires additional configuration, when creating the destination. We will talk -through what each piece of configuration means. +Using this feature requires additional configuration, when creating the destination. We will talk through what each piece of configuration means. 1. Configure all fields for the destination as you normally would, except `SSH Tunnel Method`. -2. `SSH Tunnel Method` defaults to `No Tunnel` \(meaning a direct connection\). If you want to use - an SSH Tunnel choose `SSH Key Authentication` or `Password Authentication`. - 1. Choose `Key Authentication` if you will be using an RSA private key as your secret for - establishing the SSH Tunnel \(see below for more information on generating this key\). - 2. Choose `Password Authentication` if you will be using a password as your secret for - establishing the SSH Tunnel. -3. `SSH Tunnel Jump Server Host` refers to the intermediate \(bastion\) server that Airbyte will - connect to. This should be a hostname or an IP Address. -4. `SSH Connection Port` is the port on the bastion server with which to make the SSH connection. - The default port for SSH connections is `22`, so unless you have explicitly changed something, go - with the default. -5. `SSH Login Username` is the username that Airbyte should use when connection to the bastion - server. This is NOT the MySQl username. -6. If you are using `Password Authentication`, then `SSH Login Username` should be set to the - password of the User from the previous step. If you are using `SSH Key Authentication` leave this - blank. Again, this is not the MySQl password, but the password for the OS-user that Airbyte is - using to perform commands on the bastion. -7. If you are using `SSH Key Authentication`, then `SSH Private Key` should be set to the RSA - Private Key that you are using to create the SSH connection. This should be the full contents of - the key file starting with `-----BEGIN RSA PRIVATE KEY-----` and ending - with `-----END RSA PRIVATE KEY-----`. +2. `SSH Tunnel Method` defaults to `No Tunnel` \(meaning a direct connection\). If you want to use an SSH Tunnel choose `SSH Key Authentication` or `Password Authentication`. + 1. Choose `Key Authentication` if you will be using an RSA private key as your secret for establishing the SSH Tunnel \(see below for more information on generating this key\). + 2. Choose `Password Authentication` if you will be using a password as your secret for establishing the SSH Tunnel. +3. `SSH Tunnel Jump Server Host` refers to the intermediate \(bastion\) server that Airbyte will connect to. This should be a hostname or an IP Address. +4. `SSH Connection Port` is the port on the bastion server with which to make the SSH connection. The default port for SSH connections is `22`, so unless you have explicitly changed something, go with the default. +5. `SSH Login Username` is the username that Airbyte should use when connection to the bastion server. This is NOT the MySQl username. +6. If you are using `Password Authentication`, then `SSH Login Username` should be set to the password of the User from the previous step. If you are using `SSH Key Authentication` leave this blank. Again, this is not the MySQl password, but the password for the OS-user that Airbyte is using to perform commands on the bastion. +7. If you are using `SSH Key Authentication`, then `SSH Private Key` should be set to the RSA Private Key that you are using to create the SSH connection. This should be the full contents of the key file starting with `-----BEGIN RSA PRIVATE KEY-----` and ending with `-----END RSA PRIVATE KEY-----`. ## CHANGELOG -| Version | Date | Pull Request | Subject | -|:--------|:-----------| :--- | :--- | -| 0.1.17 | 2022-02-16 | [10362](https://github.com/airbytehq/airbyte/pull/10362) | Add jdbc_url_params support for optional JDBC parameters | +| Version | Date | Pull Request | Subject | +|:--------| :--- | :--- | :--- | | 0.1.16 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option | | 0.1.15 | 2021-12-01 | [8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key | | 0.1.14 | 2021-11-08 | [#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count | @@ -158,6 +111,7 @@ through what each piece of configuration means. | 0.1.1 | 2021-07-03 | [\#3289](https://github.com/airbytehq/airbyte/pull/3289) | Added support for outputting messages. | | 0.1.0 | 2021-05-06 | [\#3242](https://github.com/airbytehq/airbyte/pull/3242) | Added MySQL destination. | + ## CHANGELOG destination-mysql-strict-encrypt | Version | Date | Pull Request | Subject | From 9744ba4981c253e8b2b49e206d51c664fcddbd19 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 13:18:20 -0800 Subject: [PATCH 16/29] Update doc --- docs/integrations/destinations/mysql.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/integrations/destinations/mysql.md b/docs/integrations/destinations/mysql.md index b85993fd8b7ba..3227f8e48121a 100644 --- a/docs/integrations/destinations/mysql.md +++ b/docs/integrations/destinations/mysql.md @@ -59,6 +59,16 @@ You should now have all the requirements needed to configure MySQL as a destinat * **Username** * **Password** * **Database** +* **jdbc_url_params** (Optional) + +### Default JDBC URL Parameters + +The following JDBC URL parameters are set by Airbyte and cannot be overridden by the `jdbc_url_params` field: + +* useSSL +* requireSSL +* verifyServerCertificate +* zeroDateTimeBehavior ## Known Limitations @@ -95,6 +105,7 @@ Using this feature requires additional configuration, when creating the destinat | Version | Date | Pull Request | Subject | |:--------| :--- | :--- | :--- | +| 0.1.17 | 2022-02-16 | [10362](https://github.com/airbytehq/airbyte/pull/10362) | Add jdbc_url_params support for optional JDBC parameters | | 0.1.16 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option | | 0.1.15 | 2021-12-01 | [8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key | | 0.1.14 | 2021-11-08 | [#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count | From 70cd82d69c93ca264f0b2b5b7a4a9b8ca79e7042 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 13:24:43 -0800 Subject: [PATCH 17/29] delete dead code --- .../destination/mysql/MySQLDestinationTest.java | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java index 4965be459a479..71090f343e78e 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -37,19 +37,7 @@ private JsonNode buildConfigWithExtraJDBCParameters(final String extraParam) { )); return config; } - - private JsonNode buildConfigWithExtraJDBCParametersNoSSL(final String extraParam) { - final JsonNode config = Jsons.jsonNode(ImmutableMap.of( - "host", "localhost", - "port", 1337, - "username", "user", - "database", "db", - "ssl", false, - "jdbc_url_params", extraParam - )); - return config; - } - + @Test void testNoExtraParams() { final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoJDBCParameters()); From 1c7660e1ad424af57f53e4e2e3d7a970265e5946 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 13:25:00 -0800 Subject: [PATCH 18/29] update doc --- docs/integrations/destinations/mysql.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/integrations/destinations/mysql.md b/docs/integrations/destinations/mysql.md index 3227f8e48121a..35486908b5077 100644 --- a/docs/integrations/destinations/mysql.md +++ b/docs/integrations/destinations/mysql.md @@ -65,10 +65,10 @@ You should now have all the requirements needed to configure MySQL as a destinat The following JDBC URL parameters are set by Airbyte and cannot be overridden by the `jdbc_url_params` field: -* useSSL -* requireSSL -* verifyServerCertificate -* zeroDateTimeBehavior +* `useSSL=true` (unless `ssl` is set to false) +* `requireSSL=true` (unless `ssl` is set to false) +* `verifyServerCertificate=false` (unless `ssl` is set to false) +* `zeroDateTimeBehavior=convertToNull` ## Known Limitations From 6bfce23a3a2b9381c93c26d396890c78f62765ca Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 14:41:37 -0800 Subject: [PATCH 19/29] Throw exception with better error message --- .../integrations/destination/mysql/MySQLDestination.java | 8 +++++++- .../destination/mysql/MySQLDestinationTest.java | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index 0f5fdce095cb5..744ebbe22a057 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -162,7 +162,13 @@ private Map getCustomJdbcParameters(final JsonNode config) { final String[] keyValuePairs = jdbcParams.split("&"); for (final String kv : keyValuePairs) { final String[] split = kv.split("="); - parameters.put(split[0], split[1]); + if (split.length == 2) { + parameters.put(split[0], split[1]); + } else { + throw new RuntimeException( + "jdbc_url_params must be formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). Got " + + jdbcParams); + } } } } diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java index 71090f343e78e..1e6d5197e0c7f 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -37,7 +37,7 @@ private JsonNode buildConfigWithExtraJDBCParameters(final String extraParam) { )); return config; } - + @Test void testNoExtraParams() { final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoJDBCParameters()); From e6009780ae2323735191f79f33c16ec87fd5b0e3 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 14:44:52 -0800 Subject: [PATCH 20/29] Add missing test --- .../destination/mysql/MySQLDestinationTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java index 1e6d5197e0c7f..8a367e4e9a056 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -78,4 +78,12 @@ void testExtraParamsWithDefaultParameter() { ); } } + + @Test + void testInvalidExtraParam() { + final String extraParam = "key1=value1&sdf&"; + assertThrows(RuntimeException.class, () -> { + getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters(extraParam)); + }); + } } From 4f9b1ded9c8554293acdfe237912d536e0493d44 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 15:01:48 -0800 Subject: [PATCH 21/29] Use MoreMaps::merge --- .../destination/mysql/MySQLDestination.java | 21 +++++++------------ .../mysql/MySQLDestinationTest.java | 6 +++--- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index 744ebbe22a057..c463dff165624 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -8,6 +8,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.map.MoreMaps; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.Destination; @@ -21,7 +22,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import org.slf4j.Logger; @@ -126,15 +126,15 @@ private List getAdditionalParameters(final JsonNode config) { final Map customParameters = getCustomJdbcParameters(config); if (useSSL(config)) { - return convertToJdbcStrings(customParameters, List.of(DEFAULT_JDBC_PARAMETERS, SSL_JDBC_PARAMETERS)); + return convertToJdbcStrings(customParameters, MoreMaps.merge(DEFAULT_JDBC_PARAMETERS, SSL_JDBC_PARAMETERS)); } else { - return convertToJdbcStrings(customParameters, List.of(DEFAULT_JDBC_PARAMETERS)); + return convertToJdbcStrings(customParameters, DEFAULT_JDBC_PARAMETERS); } } - private List convertToJdbcStrings(final Map customParameters, final List> defaultParametersMaps) { - assertCustomParametersDontOverwriteDefaultParameters(customParameters, defaultParametersMaps); - return Streams.concat(Stream.of(customParameters), defaultParametersMaps.stream()) + private List convertToJdbcStrings(final Map customParameters, final Map defaultParametersMap) { + assertCustomParametersDontOverwriteDefaultParameters(customParameters, defaultParametersMap); + return Streams.concat(Stream.of(customParameters, defaultParametersMap)) .map(Map::entrySet) .flatMap(Collection::stream) .map(entry -> formatParameter(entry.getKey(), entry.getValue())) @@ -142,13 +142,8 @@ private List convertToJdbcStrings(final Map customParame } private void assertCustomParametersDontOverwriteDefaultParameters(final Map customParameters, - final List> defaultParametersMaps) { - final Set keys = defaultParametersMaps.stream() - .map(Map::keySet) - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - - final List duplicateKeys = keys.stream().filter(customParameters::containsKey).toList(); + final Map defaultParametersMaps) { + final List duplicateKeys = defaultParametersMaps.keySet().stream().filter(customParameters::containsKey).toList(); if (!duplicateKeys.isEmpty()) { throw new RuntimeException("Cannot overwrite default JDBC parameter " + duplicateKeys); } diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java index 8a367e4e9a056..62aac01b31c0b 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -42,14 +42,14 @@ private JsonNode buildConfigWithExtraJDBCParameters(final String extraParam) { void testNoExtraParams() { final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoJDBCParameters()); final String url = jdbcConfig.get("jdbc_url").asText(); - assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&useSSL=true&requireSSL=true&verifyServerCertificate=false&", url); + assertEquals("jdbc:mysql://localhost:1337/db?verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true&", url); } @Test void testEmptyExtraParams() { final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters("")); final String url = jdbcConfig.get("jdbc_url").asText(); - assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&useSSL=true&requireSSL=true&verifyServerCertificate=false&", url); + assertEquals("jdbc:mysql://localhost:1337/db?verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true&", url); } @Test @@ -58,7 +58,7 @@ void testExtraParams() { final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters(extraParam)); final String url = jdbcConfig.get("jdbc_url").asText(); assertEquals( - "jdbc:mysql://localhost:1337/db?key1=value1&key2=value2&key3=value3&zeroDateTimeBehavior=convertToNull&useSSL=true&requireSSL=true&verifyServerCertificate=false&", + "jdbc:mysql://localhost:1337/db?key1=value1&key2=value2&key3=value3&verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true&", url); } From 510c954bcbf69531098c39c3f4207932cdf7482c Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 15:07:22 -0800 Subject: [PATCH 22/29] Add missing tests --- .../mysql/MySQLDestinationTest.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java index 62aac01b31c0b..9d73915b66cdd 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -38,6 +38,29 @@ private JsonNode buildConfigWithExtraJDBCParameters(final String extraParam) { return config; } + private JsonNode buildConfigWithExtraJDBCParametersWithNoSSL(final String extraParam) { + final JsonNode config = Jsons.jsonNode(ImmutableMap.of( + "host", "localhost", + "port", 1337, + "username", "user", + "database", "db", + "ssl", false, + "jdbc_url_params", extraParam + )); + return config; + } + + private JsonNode buildConfigNoExtraJDBCParametersWithoutSSL() { + final JsonNode config = Jsons.jsonNode(ImmutableMap.of( + "host", "localhost", + "port", 1337, + "username", "user", + "database", "db", + "ssl", false + )); + return config; + } + @Test void testNoExtraParams() { final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoJDBCParameters()); @@ -79,6 +102,25 @@ void testExtraParamsWithDefaultParameter() { } } + @Test + void testExtraParameterNoSsl() { + final String extraParam = "key1=value1&key2=value2&key3=value3"; + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJDBCParametersWithNoSSL(extraParam)); + final String url = jdbcConfig.get("jdbc_url").asText(); + assertEquals( + "jdbc:mysql://localhost:1337/db?key1=value1&key2=value2&key3=value3&zeroDateTimeBehavior=convertToNull&", + url); + } + + @Test + void testNoExtraParameterNoSsl() { + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoExtraJDBCParametersWithoutSSL()); + final String url = jdbcConfig.get("jdbc_url").asText(); + assertEquals( + "jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&", + url); + } + @Test void testInvalidExtraParam() { final String extraParam = "key1=value1&sdf&"; From c53e6c8b4c67b28cc8a92eb11acbb9bbaa0cefd2 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 15:09:54 -0800 Subject: [PATCH 23/29] camel case --- .../mysql/MySQLDestinationTest.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java index 9d73915b66cdd..e78d43a63ebc6 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -17,7 +17,7 @@ private MySQLDestination getDestination() { return result; } - private JsonNode buildConfigNoJDBCParameters() { + private JsonNode buildConfigNoJdbcParameters() { final JsonNode config = Jsons.jsonNode(ImmutableMap.of( "host", "localhost", "port", 1337, @@ -27,7 +27,7 @@ private JsonNode buildConfigNoJDBCParameters() { return config; } - private JsonNode buildConfigWithExtraJDBCParameters(final String extraParam) { + private JsonNode buildConfigWithExtraJdbcParameters(final String extraParam) { final JsonNode config = Jsons.jsonNode(ImmutableMap.of( "host", "localhost", "port", 1337, @@ -38,7 +38,7 @@ private JsonNode buildConfigWithExtraJDBCParameters(final String extraParam) { return config; } - private JsonNode buildConfigWithExtraJDBCParametersWithNoSSL(final String extraParam) { + private JsonNode buildConfigWithExtraJdbcParametersWithNoSsl(final String extraParam) { final JsonNode config = Jsons.jsonNode(ImmutableMap.of( "host", "localhost", "port", 1337, @@ -50,7 +50,7 @@ private JsonNode buildConfigWithExtraJDBCParametersWithNoSSL(final String extraP return config; } - private JsonNode buildConfigNoExtraJDBCParametersWithoutSSL() { + private JsonNode buildConfigNoExtraJdbcParametersWithoutSsl() { final JsonNode config = Jsons.jsonNode(ImmutableMap.of( "host", "localhost", "port", 1337, @@ -63,14 +63,14 @@ private JsonNode buildConfigNoExtraJDBCParametersWithoutSSL() { @Test void testNoExtraParams() { - final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoJDBCParameters()); + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoJdbcParameters()); final String url = jdbcConfig.get("jdbc_url").asText(); assertEquals("jdbc:mysql://localhost:1337/db?verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true&", url); } @Test void testEmptyExtraParams() { - final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters("")); + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters("")); final String url = jdbcConfig.get("jdbc_url").asText(); assertEquals("jdbc:mysql://localhost:1337/db?verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true&", url); } @@ -78,7 +78,7 @@ void testEmptyExtraParams() { @Test void testExtraParams() { final String extraParam = "key1=value1&key2=value2&key3=value3"; - final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters(extraParam)); + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(extraParam)); final String url = jdbcConfig.get("jdbc_url").asText(); assertEquals( "jdbc:mysql://localhost:1337/db?key1=value1&key2=value2&key3=value3&verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true&", @@ -90,14 +90,14 @@ void testExtraParamsWithDefaultParameter() { for (final Entry entry : MySQLDestination.SSL_JDBC_PARAMETERS.entrySet()) { final String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); assertThrows(RuntimeException.class, () -> - getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters(extraParam)) + getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(extraParam)) ); } for (final Entry entry : MySQLDestination.DEFAULT_JDBC_PARAMETERS.entrySet()) { final String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); assertThrows(RuntimeException.class, () -> - getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters(extraParam)) + getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(extraParam)) ); } } @@ -105,7 +105,7 @@ void testExtraParamsWithDefaultParameter() { @Test void testExtraParameterNoSsl() { final String extraParam = "key1=value1&key2=value2&key3=value3"; - final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJDBCParametersWithNoSSL(extraParam)); + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJdbcParametersWithNoSsl(extraParam)); final String url = jdbcConfig.get("jdbc_url").asText(); assertEquals( "jdbc:mysql://localhost:1337/db?key1=value1&key2=value2&key3=value3&zeroDateTimeBehavior=convertToNull&", @@ -114,7 +114,7 @@ void testExtraParameterNoSsl() { @Test void testNoExtraParameterNoSsl() { - final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoExtraJDBCParametersWithoutSSL()); + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoExtraJdbcParametersWithoutSsl()); final String url = jdbcConfig.get("jdbc_url").asText(); assertEquals( "jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&", @@ -125,7 +125,7 @@ void testNoExtraParameterNoSsl() { void testInvalidExtraParam() { final String extraParam = "key1=value1&sdf&"; assertThrows(RuntimeException.class, () -> { - getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters(extraParam)); + getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(extraParam)); }); } } From c1eccd40e878428a619f3f264236b308d41ab181 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 15:27:30 -0800 Subject: [PATCH 24/29] Allow colliding parameters if values are equal --- .../destination/mysql/MySQLDestination.java | 10 ++++---- .../mysql/MySQLDestinationTest.java | 23 +++++++++++-------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index c463dff165624..8904471a66da5 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; import org.slf4j.Logger; @@ -142,10 +143,11 @@ private List convertToJdbcStrings(final Map customParame } private void assertCustomParametersDontOverwriteDefaultParameters(final Map customParameters, - final Map defaultParametersMaps) { - final List duplicateKeys = defaultParametersMaps.keySet().stream().filter(customParameters::containsKey).toList(); - if (!duplicateKeys.isEmpty()) { - throw new RuntimeException("Cannot overwrite default JDBC parameter " + duplicateKeys); + final Map defaultParameters) { + for (final String key : defaultParameters.keySet()) { + if (customParameters.containsKey(key) && !Objects.equals(customParameters.get(key), defaultParameters.get(key))) { + throw new RuntimeException("Cannot overwrite default JDBC parameter " + key); + } } } diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java index e78d43a63ebc6..f2833f6f70f06 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -1,5 +1,6 @@ package io.airbyte.integrations.destination.mysql; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.spy; @@ -7,6 +8,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.map.MoreMaps; +import java.util.Map; import java.util.Map.Entry; import org.junit.jupiter.api.Test; @@ -87,17 +90,19 @@ void testExtraParams() { @Test void testExtraParamsWithDefaultParameter() { - for (final Entry entry : MySQLDestination.SSL_JDBC_PARAMETERS.entrySet()) { - final String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); - assertThrows(RuntimeException.class, () -> - getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(extraParam)) + final Map allDefaultParameters = MoreMaps.merge(MySQLDestination.SSL_JDBC_PARAMETERS, + MySQLDestination.DEFAULT_JDBC_PARAMETERS); + for (final Entry entry : allDefaultParameters.entrySet()) { + final String identicalParameter = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); + final String overridingParameter = MySQLDestination.formatParameter(entry.getKey(), "DIFFERENT_VALUE"); + + // Do not throw an exception if the values are equal + assertDoesNotThrow(() -> + getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(identicalParameter)).get("jdbc_url").asText() ); - } - - for (final Entry entry : MySQLDestination.DEFAULT_JDBC_PARAMETERS.entrySet()) { - final String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); + // Throw an exception if the values are different assertThrows(RuntimeException.class, () -> - getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(extraParam)) + getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(overridingParameter)) ); } } From e9e2d7a31728c77efe0ef24f33c39fc551800bdf Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 15:38:13 -0800 Subject: [PATCH 25/29] Remove trailing & --- .../destination/mysql/MySQLDestination.java | 2 +- .../destination/mysql/MySQLDestinationTest.java | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index 8904471a66da5..8a716cbff461a 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -109,7 +109,7 @@ public JsonNode toJdbcConfig(final JsonNode config) { // since zero dates are placeholders, we convert them to null by default if (!additionalParameters.isEmpty()) { jdbcUrl.append("?"); - additionalParameters.forEach(x -> jdbcUrl.append(x).append("&")); + jdbcUrl.append(String.join("&", additionalParameters)); } final ImmutableMap.Builder configBuilder = ImmutableMap.builder() diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java index f2833f6f70f06..2f0012779c39f 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -68,14 +68,14 @@ private JsonNode buildConfigNoExtraJdbcParametersWithoutSsl() { void testNoExtraParams() { final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoJdbcParameters()); final String url = jdbcConfig.get("jdbc_url").asText(); - assertEquals("jdbc:mysql://localhost:1337/db?verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true&", url); + assertEquals("jdbc:mysql://localhost:1337/db?verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true", url); } @Test void testEmptyExtraParams() { final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters("")); final String url = jdbcConfig.get("jdbc_url").asText(); - assertEquals("jdbc:mysql://localhost:1337/db?verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true&", url); + assertEquals("jdbc:mysql://localhost:1337/db?verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true", url); } @Test @@ -84,7 +84,7 @@ void testExtraParams() { final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(extraParam)); final String url = jdbcConfig.get("jdbc_url").asText(); assertEquals( - "jdbc:mysql://localhost:1337/db?key1=value1&key2=value2&key3=value3&verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true&", + "jdbc:mysql://localhost:1337/db?key1=value1&key2=value2&key3=value3&verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true", url); } @@ -113,7 +113,7 @@ void testExtraParameterNoSsl() { final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJdbcParametersWithNoSsl(extraParam)); final String url = jdbcConfig.get("jdbc_url").asText(); assertEquals( - "jdbc:mysql://localhost:1337/db?key1=value1&key2=value2&key3=value3&zeroDateTimeBehavior=convertToNull&", + "jdbc:mysql://localhost:1337/db?key1=value1&key2=value2&key3=value3&zeroDateTimeBehavior=convertToNull", url); } @@ -122,7 +122,7 @@ void testNoExtraParameterNoSsl() { final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoExtraJdbcParametersWithoutSsl()); final String url = jdbcConfig.get("jdbc_url").asText(); assertEquals( - "jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&", + "jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull", url); } From 80352b656da4f9eae77582f348d283ea2954b87c Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 16:17:38 -0800 Subject: [PATCH 26/29] Throw IllegalArgumentException --- .../integrations/destination/mysql/MySQLDestination.java | 4 ++-- .../integrations/destination/mysql/MySQLDestinationTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index 8a716cbff461a..f9049bf4949ac 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -146,7 +146,7 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map defaultParameters) { for (final String key : defaultParameters.keySet()) { if (customParameters.containsKey(key) && !Objects.equals(customParameters.get(key), defaultParameters.get(key))) { - throw new RuntimeException("Cannot overwrite default JDBC parameter " + key); + throw new IllegalArgumentException("Cannot overwrite default JDBC parameter " + key); } } } @@ -162,7 +162,7 @@ private Map getCustomJdbcParameters(final JsonNode config) { if (split.length == 2) { parameters.put(split[0], split[1]); } else { - throw new RuntimeException( + throw new IllegalArgumentException( "jdbc_url_params must be formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). Got " + jdbcParams); } diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java index 2f0012779c39f..aad36914cf401 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -101,7 +101,7 @@ void testExtraParamsWithDefaultParameter() { getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(identicalParameter)).get("jdbc_url").asText() ); // Throw an exception if the values are different - assertThrows(RuntimeException.class, () -> + assertThrows(IllegalArgumentException.class, () -> getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(overridingParameter)) ); } @@ -129,7 +129,7 @@ void testNoExtraParameterNoSsl() { @Test void testInvalidExtraParam() { final String extraParam = "key1=value1&sdf&"; - assertThrows(RuntimeException.class, () -> { + assertThrows(IllegalArgumentException.class, () -> { getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(extraParam)); }); } From d3a1263ff7491c7ca6bf6b973f6046f0e5a712f1 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 16:28:10 -0800 Subject: [PATCH 27/29] extract to constants --- .../destination/mysql/MySQLDestination.java | 39 +++++++++++-------- .../SshMySQLDestinationAcceptanceTest.java | 27 ++++++------- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index f9049bf4949ac..ec2016a41dfcb 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -31,13 +31,18 @@ public class MySQLDestination extends AbstractJdbcDestination implements Destination { private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDestination.class); - public static final List HOST_KEY = List.of("host"); - public static final List PORT_KEY = List.of("port"); - - public static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + public static final String DATABASE_KEY = "database"; + public static final String HOST_KEY = "host"; + public static final String JDBC_URL_KEY = "jdbc_url"; public static final String JDBC_URL_PARAMS_KEY = "jdbc_url_params"; + public static final String PASSWORD_KEY = "password"; + public static final String PORT_KEY = "port"; + public static final String SSL_KEY = "ssl"; + public static final String USERNAME_KEY = "username"; + public static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + static final Map SSL_JDBC_PARAMETERS = ImmutableMap.of( "useSSL", "true", "requireSSL", "true", @@ -48,7 +53,7 @@ public class MySQLDestination extends AbstractJdbcDestination implements Destina ); public static Destination sshWrappedDestination() { - return new SshWrappedDestination(new MySQLDestination(), HOST_KEY, PORT_KEY); + return new SshWrappedDestination(new MySQLDestination(), List.of(HOST_KEY), List.of(PORT_KEY)); } @Override @@ -56,7 +61,7 @@ public AirbyteConnectionStatus check(final JsonNode config) { try (final JdbcDatabase database = getDatabase(config)) { final MySQLSqlOperations mySQLSqlOperations = (MySQLSqlOperations) getSqlOperations(); - final String outputSchema = getNamingResolver().getIdentifier(config.get("database").asText()); + final String outputSchema = getNamingResolver().getIdentifier(config.get(DATABASE_KEY).asText()); attemptSQLCreateAndDropTableOperations(outputSchema, database, getNamingResolver(), mySQLSqlOperations); @@ -87,9 +92,9 @@ protected JdbcDatabase getDatabase(final JsonNode config) { final JsonNode jdbcConfig = toJdbcConfig(config); return Databases.createJdbcDatabase( - jdbcConfig.get("username").asText(), - jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null, - jdbcConfig.get("jdbc_url").asText(), + jdbcConfig.get(USERNAME_KEY).asText(), + jdbcConfig.has(PASSWORD_KEY) ? jdbcConfig.get(PASSWORD_KEY).asText() : null, + jdbcConfig.get(JDBC_URL_KEY).asText(), getDriverClass(), "allowLoadLocalInfile=true"); } @@ -99,9 +104,9 @@ public JsonNode toJdbcConfig(final JsonNode config) { final List additionalParameters = getAdditionalParameters(config); final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s", - config.get("host").asText(), - config.get("port").asText(), - config.get("database").asText())); + config.get(HOST_KEY).asText(), + config.get(PORT_KEY).asText(), + config.get(DATABASE_KEY).asText())); // zero dates by default cannot be parsed into java date objects (they will throw an error) // in addition, users don't always have agency in fixing them e.g: maybe they don't own the database // and can't @@ -113,11 +118,11 @@ public JsonNode toJdbcConfig(final JsonNode config) { } final ImmutableMap.Builder configBuilder = ImmutableMap.builder() - .put("username", config.get("username").asText()) - .put("jdbc_url", jdbcUrl.toString()); + .put(USERNAME_KEY, config.get(USERNAME_KEY).asText()) + .put(JDBC_URL_KEY, jdbcUrl.toString()); - if (config.has("password")) { - configBuilder.put("password", config.get("password").asText()); + if (config.has(PASSWORD_KEY)) { + configBuilder.put(PASSWORD_KEY, config.get(PASSWORD_KEY).asText()); } return Jsons.jsonNode(configBuilder.build()); @@ -173,7 +178,7 @@ private Map getCustomJdbcParameters(final JsonNode config) { } private boolean useSSL(final JsonNode config) { - return !config.has("ssl") || config.get("ssl").asBoolean(); + return !config.has(SSL_KEY) || config.get(SSL_KEY).asBoolean(); } static String formatParameter(final String key, final String value) { diff --git a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java index 409737eaf9deb..0d1c1f18d1f72 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java @@ -25,12 +25,13 @@ import org.apache.commons.lang3.RandomStringUtils; /** - * Abstract class that allows us to avoid duplicating testing logic for testing SSH with a key file - * or with a password. + * Abstract class that allows us to avoid duplicating testing logic for testing SSH with a key file or with a password. */ public abstract class SshMySQLDestinationAcceptanceTest extends DestinationAcceptanceTest { private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer(); + private final List HOST_KEY = List.of(MySQLDestination.HOST_KEY); + private final List PORT_KEY = List.of(MySQLDestination.PORT_KEY); private String schemaName; public abstract Path getConfigFilePath(); @@ -60,9 +61,9 @@ protected JsonNode getFailCheckConfig() { @Override protected List retrieveRecords(final TestDestinationEnv env, - final String streamName, - final String namespace, - final JsonNode streamSchema) + final String streamName, + final String namespace, + final JsonNode streamSchema) throws Exception { return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) .stream() @@ -87,8 +88,8 @@ protected boolean implementsNamespaces() { @Override protected List retrieveNormalizedRecords(final TestDestinationEnv env, - final String streamName, - final String namespace) + final String streamName, + final String namespace) throws Exception { final var tableName = namingResolver.getIdentifier(streamName); final String schema = namespace != null ? namingResolver.getIdentifier(namespace) : namingResolver.getIdentifier(schemaName); @@ -121,8 +122,8 @@ private List retrieveRecordsFromTable(final String tableName, final St final var schema = schemaName == null ? this.schemaName : schemaName; return SshTunnel.sshWrap( getConfig(), - MySQLDestination.HOST_KEY, - MySQLDestination.PORT_KEY, + HOST_KEY, + PORT_KEY, (CheckedFunction, Exception>) mangledConfig -> getDatabaseFromConfig(mangledConfig) .query( ctx -> ctx @@ -140,8 +141,8 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception { final var config = getConfig(); SshTunnel.sshWrap( config, - MySQLDestination.HOST_KEY, - MySQLDestination.PORT_KEY, + HOST_KEY, + PORT_KEY, mangledConfig -> { getDatabaseFromConfig(mangledConfig).query(ctx -> ctx.fetch(String.format("CREATE DATABASE %s;", schemaName))); }); @@ -151,8 +152,8 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception { protected void tearDown(final TestDestinationEnv testEnv) throws Exception { SshTunnel.sshWrap( getConfig(), - MySQLDestination.HOST_KEY, - MySQLDestination.PORT_KEY, + HOST_KEY, + PORT_KEY, mangledConfig -> { getDatabaseFromConfig(mangledConfig).query(ctx -> ctx.fetch(String.format("DROP DATABASE %s", schemaName))); }); From 449e2cfa0bc51a7744f783003a1dec2247d40685 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 18:00:57 -0800 Subject: [PATCH 28/29] Bump version in seed --- .../init/src/main/resources/seed/destination_definitions.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index b8dab1d31509f..7a92b2efe8143 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -126,7 +126,7 @@ - name: MySQL destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42 dockerRepository: airbyte/destination-mysql - dockerImageTag: 0.1.16 + dockerImageTag: 0.1.17 documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql icon: mysql.svg - name: Oracle From 5800b6d26c33d837e207c3f8adb3474ff0a0df41 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 16 Feb 2022 18:01:56 -0800 Subject: [PATCH 29/29] Update destination specs --- .../init/src/main/resources/seed/destination_specs.yaml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 11478dc39b16f..3c17e189f3b0c 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -2463,7 +2463,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-mysql:0.1.16" +- dockerImage: "airbyte/destination-mysql:0.1.17" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/mysql" connectionSpecification: @@ -2514,6 +2514,13 @@ type: "boolean" default: true order: 5 + jdbc_url_params: + description: "Additional properties to pass to the JDBC URL string when\ + \ connecting to the database formatted as 'key=value' pairs separated\ + \ by the symbol '&'. (example: key1=value1&key2=value2&key3=value3)." + title: "JDBC URL Params" + type: "string" + order: 6 tunnel_method: type: "object" title: "SSH Tunnel Method"