diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index b8dab1d31509f..7a92b2efe8143 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -126,7 +126,7 @@ - name: MySQL destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42 dockerRepository: airbyte/destination-mysql - dockerImageTag: 0.1.16 + dockerImageTag: 0.1.17 documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql icon: mysql.svg - name: Oracle diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 11478dc39b16f..3c17e189f3b0c 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -2463,7 +2463,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-mysql:0.1.16" +- dockerImage: "airbyte/destination-mysql:0.1.17" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/mysql" connectionSpecification: @@ -2514,6 +2514,13 @@ type: "boolean" default: true order: 5 + jdbc_url_params: + description: "Additional properties to pass to the JDBC URL string when\ + \ connecting to the database formatted as 'key=value' pairs separated\ + \ by the symbol '&'. (example: key1=value1&key2=value2&key3=value3)." + title: "JDBC URL Params" + type: "string" + order: 6 tunnel_method: type: "object" title: "SSH Tunnel Method" diff --git a/airbyte-integrations/connectors/destination-mysql/Dockerfile b/airbyte-integrations/connectors/destination-mysql/Dockerfile index ac835aba62d67..b137bdd41e32b 100644 --- a/airbyte-integrations/connectors/destination-mysql/Dockerfile +++ b/airbyte-integrations/connectors/destination-mysql/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-mysql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.16 +LABEL io.airbyte.version=0.1.17 LABEL io.airbyte.name=airbyte/destination-mysql diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index 7bc53a2beb549..ec2016a41dfcb 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -6,7 +6,9 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Streams; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.map.MoreMaps; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.Destination; @@ -16,21 +18,42 @@ import io.airbyte.integrations.destination.mysql.MySQLSqlOperations.VersionCompatibility; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; -import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MySQLDestination extends AbstractJdbcDestination implements Destination { private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDestination.class); - public static final List HOST_KEY = List.of("host"); - public static final List PORT_KEY = List.of("port"); + + public static final String DATABASE_KEY = "database"; + public static final String HOST_KEY = "host"; + public static final String JDBC_URL_KEY = "jdbc_url"; + public static final String JDBC_URL_PARAMS_KEY = "jdbc_url_params"; + public static final String PASSWORD_KEY = "password"; + public static final String PORT_KEY = "port"; + public static final String SSL_KEY = "ssl"; + public static final String USERNAME_KEY = "username"; public static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + + static final Map SSL_JDBC_PARAMETERS = ImmutableMap.of( + "useSSL", "true", + "requireSSL", "true", + "verifyServerCertificate", "false" + ); + static final Map DEFAULT_JDBC_PARAMETERS = ImmutableMap.of( + "zeroDateTimeBehavior", "convertToNull" + ); public static Destination sshWrappedDestination() { - return new SshWrappedDestination(new MySQLDestination(), HOST_KEY, PORT_KEY); + return new SshWrappedDestination(new MySQLDestination(), List.of(HOST_KEY), List.of(PORT_KEY)); } @Override @@ -38,7 +61,7 @@ public AirbyteConnectionStatus check(final JsonNode config) { try (final JdbcDatabase database = getDatabase(config)) { final MySQLSqlOperations mySQLSqlOperations = (MySQLSqlOperations) getSqlOperations(); - final String outputSchema = getNamingResolver().getIdentifier(config.get("database").asText()); + final String outputSchema = getNamingResolver().getIdentifier(config.get(DATABASE_KEY).asText()); attemptSQLCreateAndDropTableOperations(outputSchema, database, getNamingResolver(), mySQLSqlOperations); @@ -69,49 +92,99 @@ protected JdbcDatabase getDatabase(final JsonNode config) { final JsonNode jdbcConfig = toJdbcConfig(config); return Databases.createJdbcDatabase( - jdbcConfig.get("username").asText(), - jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null, - jdbcConfig.get("jdbc_url").asText(), + jdbcConfig.get(USERNAME_KEY).asText(), + jdbcConfig.has(PASSWORD_KEY) ? jdbcConfig.get(PASSWORD_KEY).asText() : null, + jdbcConfig.get(JDBC_URL_KEY).asText(), getDriverClass(), "allowLoadLocalInfile=true"); } @Override public JsonNode toJdbcConfig(final JsonNode config) { - final List additionalParameters = new ArrayList<>(); - - if (!config.has("ssl") || config.get("ssl").asBoolean()) { - additionalParameters.add("useSSL=true"); - additionalParameters.add("requireSSL=true"); - additionalParameters.add("verifyServerCertificate=false"); - } + final List additionalParameters = getAdditionalParameters(config); final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s", - config.get("host").asText(), - config.get("port").asText(), - config.get("database").asText())); + config.get(HOST_KEY).asText(), + config.get(PORT_KEY).asText(), + config.get(DATABASE_KEY).asText())); // zero dates by default cannot be parsed into java date objects (they will throw an error) // in addition, users don't always have agency in fixing them e.g: maybe they don't own the database // and can't // remove zero date values. // since zero dates are placeholders, we convert them to null by default - jdbcUrl.append("?zeroDateTimeBehavior=convertToNull"); if (!additionalParameters.isEmpty()) { - jdbcUrl.append("&"); - additionalParameters.forEach(x -> jdbcUrl.append(x).append("&")); + jdbcUrl.append("?"); + jdbcUrl.append(String.join("&", additionalParameters)); } final ImmutableMap.Builder configBuilder = ImmutableMap.builder() - .put("username", config.get("username").asText()) - .put("jdbc_url", jdbcUrl.toString()); + .put(USERNAME_KEY, config.get(USERNAME_KEY).asText()) + .put(JDBC_URL_KEY, jdbcUrl.toString()); - if (config.has("password")) { - configBuilder.put("password", config.get("password").asText()); + if (config.has(PASSWORD_KEY)) { + configBuilder.put(PASSWORD_KEY, config.get(PASSWORD_KEY).asText()); } return Jsons.jsonNode(configBuilder.build()); } + private List getAdditionalParameters(final JsonNode config) { + final Map customParameters = getCustomJdbcParameters(config); + + if (useSSL(config)) { + return convertToJdbcStrings(customParameters, MoreMaps.merge(DEFAULT_JDBC_PARAMETERS, SSL_JDBC_PARAMETERS)); + } else { + return convertToJdbcStrings(customParameters, DEFAULT_JDBC_PARAMETERS); + } + } + + private List convertToJdbcStrings(final Map customParameters, final Map defaultParametersMap) { + assertCustomParametersDontOverwriteDefaultParameters(customParameters, defaultParametersMap); + return Streams.concat(Stream.of(customParameters, defaultParametersMap)) + .map(Map::entrySet) + .flatMap(Collection::stream) + .map(entry -> formatParameter(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()); + } + + private void assertCustomParametersDontOverwriteDefaultParameters(final Map customParameters, + final Map defaultParameters) { + for (final String key : defaultParameters.keySet()) { + if (customParameters.containsKey(key) && !Objects.equals(customParameters.get(key), defaultParameters.get(key))) { + throw new IllegalArgumentException("Cannot overwrite default JDBC parameter " + key); + } + } + } + + private Map getCustomJdbcParameters(final JsonNode config) { + final Map parameters = new HashMap<>(); + if (config.has(JDBC_URL_PARAMS_KEY)) { + final String jdbcParams = config.get(JDBC_URL_PARAMS_KEY).asText(); + if (!jdbcParams.isBlank()) { + final String[] keyValuePairs = jdbcParams.split("&"); + for (final String kv : keyValuePairs) { + final String[] split = kv.split("="); + if (split.length == 2) { + parameters.put(split[0], split[1]); + } else { + throw new IllegalArgumentException( + "jdbc_url_params must be formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). Got " + + jdbcParams); + } + } + } + } + return parameters; + } + + private boolean useSSL(final JsonNode config) { + return !config.has(SSL_KEY) || config.get(SSL_KEY).asBoolean(); + } + + static String formatParameter(final String key, final String value) { + return String.format("%s=%s", key, value); + } + public static void main(final String[] args) throws Exception { final Destination destination = MySQLDestination.sshWrappedDestination(); LOGGER.info("starting destination: {}", MySQLDestination.class); diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json index e705623e1f89f..212dd541d84a7 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json @@ -3,12 +3,20 @@ "supportsIncremental": true, "supportsNormalization": true, "supportsDBT": true, - "supported_destination_sync_modes": ["overwrite", "append"], + "supported_destination_sync_modes": [ + "overwrite", + "append" + ], "connectionSpecification": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "MySQL Destination Spec", "type": "object", - "required": ["host", "port", "username", "database"], + "required": [ + "host", + "port", + "username", + "database" + ], "additionalProperties": true, "properties": { "host": { @@ -24,7 +32,9 @@ "minimum": 0, "maximum": 65536, "default": 3306, - "examples": ["3306"], + "examples": [ + "3306" + ], "order": 1 }, "database": { @@ -52,6 +62,12 @@ "type": "boolean", "default": true, "order": 5 + }, + "jdbc_url_params": { + "description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).", + "title": "JDBC URL Params", + "type": "string", + "order": 6 } } } diff --git a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java index 409737eaf9deb..0d1c1f18d1f72 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java @@ -25,12 +25,13 @@ import org.apache.commons.lang3.RandomStringUtils; /** - * Abstract class that allows us to avoid duplicating testing logic for testing SSH with a key file - * or with a password. + * Abstract class that allows us to avoid duplicating testing logic for testing SSH with a key file or with a password. */ public abstract class SshMySQLDestinationAcceptanceTest extends DestinationAcceptanceTest { private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer(); + private final List HOST_KEY = List.of(MySQLDestination.HOST_KEY); + private final List PORT_KEY = List.of(MySQLDestination.PORT_KEY); private String schemaName; public abstract Path getConfigFilePath(); @@ -60,9 +61,9 @@ protected JsonNode getFailCheckConfig() { @Override protected List retrieveRecords(final TestDestinationEnv env, - final String streamName, - final String namespace, - final JsonNode streamSchema) + final String streamName, + final String namespace, + final JsonNode streamSchema) throws Exception { return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) .stream() @@ -87,8 +88,8 @@ protected boolean implementsNamespaces() { @Override protected List retrieveNormalizedRecords(final TestDestinationEnv env, - final String streamName, - final String namespace) + final String streamName, + final String namespace) throws Exception { final var tableName = namingResolver.getIdentifier(streamName); final String schema = namespace != null ? namingResolver.getIdentifier(namespace) : namingResolver.getIdentifier(schemaName); @@ -121,8 +122,8 @@ private List retrieveRecordsFromTable(final String tableName, final St final var schema = schemaName == null ? this.schemaName : schemaName; return SshTunnel.sshWrap( getConfig(), - MySQLDestination.HOST_KEY, - MySQLDestination.PORT_KEY, + HOST_KEY, + PORT_KEY, (CheckedFunction, Exception>) mangledConfig -> getDatabaseFromConfig(mangledConfig) .query( ctx -> ctx @@ -140,8 +141,8 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception { final var config = getConfig(); SshTunnel.sshWrap( config, - MySQLDestination.HOST_KEY, - MySQLDestination.PORT_KEY, + HOST_KEY, + PORT_KEY, mangledConfig -> { getDatabaseFromConfig(mangledConfig).query(ctx -> ctx.fetch(String.format("CREATE DATABASE %s;", schemaName))); }); @@ -151,8 +152,8 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception { protected void tearDown(final TestDestinationEnv testEnv) throws Exception { SshTunnel.sshWrap( getConfig(), - MySQLDestination.HOST_KEY, - MySQLDestination.PORT_KEY, + HOST_KEY, + PORT_KEY, mangledConfig -> { getDatabaseFromConfig(mangledConfig).query(ctx -> ctx.fetch(String.format("DROP DATABASE %s", schemaName))); }); diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java new file mode 100644 index 0000000000000..aad36914cf401 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -0,0 +1,136 @@ +package io.airbyte.integrations.destination.mysql; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.spy; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.map.MoreMaps; +import java.util.Map; +import java.util.Map.Entry; +import org.junit.jupiter.api.Test; + +public class MySQLDestinationTest { + + private MySQLDestination getDestination() { + final MySQLDestination result = spy(MySQLDestination.class); + return result; + } + + private JsonNode buildConfigNoJdbcParameters() { + final JsonNode config = Jsons.jsonNode(ImmutableMap.of( + "host", "localhost", + "port", 1337, + "username", "user", + "database", "db" + )); + return config; + } + + private JsonNode buildConfigWithExtraJdbcParameters(final String extraParam) { + final JsonNode config = Jsons.jsonNode(ImmutableMap.of( + "host", "localhost", + "port", 1337, + "username", "user", + "database", "db", + "jdbc_url_params", extraParam + )); + return config; + } + + private JsonNode buildConfigWithExtraJdbcParametersWithNoSsl(final String extraParam) { + final JsonNode config = Jsons.jsonNode(ImmutableMap.of( + "host", "localhost", + "port", 1337, + "username", "user", + "database", "db", + "ssl", false, + "jdbc_url_params", extraParam + )); + return config; + } + + private JsonNode buildConfigNoExtraJdbcParametersWithoutSsl() { + final JsonNode config = Jsons.jsonNode(ImmutableMap.of( + "host", "localhost", + "port", 1337, + "username", "user", + "database", "db", + "ssl", false + )); + return config; + } + + @Test + void testNoExtraParams() { + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoJdbcParameters()); + final String url = jdbcConfig.get("jdbc_url").asText(); + assertEquals("jdbc:mysql://localhost:1337/db?verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true", url); + } + + @Test + void testEmptyExtraParams() { + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters("")); + final String url = jdbcConfig.get("jdbc_url").asText(); + assertEquals("jdbc:mysql://localhost:1337/db?verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true", url); + } + + @Test + void testExtraParams() { + final String extraParam = "key1=value1&key2=value2&key3=value3"; + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(extraParam)); + final String url = jdbcConfig.get("jdbc_url").asText(); + assertEquals( + "jdbc:mysql://localhost:1337/db?key1=value1&key2=value2&key3=value3&verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true", + url); + } + + @Test + void testExtraParamsWithDefaultParameter() { + final Map allDefaultParameters = MoreMaps.merge(MySQLDestination.SSL_JDBC_PARAMETERS, + MySQLDestination.DEFAULT_JDBC_PARAMETERS); + for (final Entry entry : allDefaultParameters.entrySet()) { + final String identicalParameter = MySQLDestination.formatParameter(entry.getKey(), entry.getValue()); + final String overridingParameter = MySQLDestination.formatParameter(entry.getKey(), "DIFFERENT_VALUE"); + + // Do not throw an exception if the values are equal + assertDoesNotThrow(() -> + getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(identicalParameter)).get("jdbc_url").asText() + ); + // Throw an exception if the values are different + assertThrows(IllegalArgumentException.class, () -> + getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(overridingParameter)) + ); + } + } + + @Test + void testExtraParameterNoSsl() { + final String extraParam = "key1=value1&key2=value2&key3=value3"; + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJdbcParametersWithNoSsl(extraParam)); + final String url = jdbcConfig.get("jdbc_url").asText(); + assertEquals( + "jdbc:mysql://localhost:1337/db?key1=value1&key2=value2&key3=value3&zeroDateTimeBehavior=convertToNull", + url); + } + + @Test + void testNoExtraParameterNoSsl() { + final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoExtraJdbcParametersWithoutSsl()); + final String url = jdbcConfig.get("jdbc_url").asText(); + assertEquals( + "jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull", + url); + } + + @Test + void testInvalidExtraParam() { + final String extraParam = "key1=value1&sdf&"; + assertThrows(IllegalArgumentException.class, () -> { + getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(extraParam)); + }); + } +} diff --git a/docs/integrations/destinations/mysql.md b/docs/integrations/destinations/mysql.md index b85993fd8b7ba..35486908b5077 100644 --- a/docs/integrations/destinations/mysql.md +++ b/docs/integrations/destinations/mysql.md @@ -59,6 +59,16 @@ You should now have all the requirements needed to configure MySQL as a destinat * **Username** * **Password** * **Database** +* **jdbc_url_params** (Optional) + +### Default JDBC URL Parameters + +The following JDBC URL parameters are set by Airbyte and cannot be overridden by the `jdbc_url_params` field: + +* `useSSL=true` (unless `ssl` is set to false) +* `requireSSL=true` (unless `ssl` is set to false) +* `verifyServerCertificate=false` (unless `ssl` is set to false) +* `zeroDateTimeBehavior=convertToNull` ## Known Limitations @@ -95,6 +105,7 @@ Using this feature requires additional configuration, when creating the destinat | Version | Date | Pull Request | Subject | |:--------| :--- | :--- | :--- | +| 0.1.17 | 2022-02-16 | [10362](https://github.com/airbytehq/airbyte/pull/10362) | Add jdbc_url_params support for optional JDBC parameters | | 0.1.16 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option | | 0.1.15 | 2021-12-01 | [8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key | | 0.1.14 | 2021-11-08 | [#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count |