Skip to content

🎉 Destination MySQL: Add jdbc_url_params support for optional JDBC parameters #10362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4d751f4
pass through jdbc params
girarda Feb 15, 2022
9b6c5bd
fail if contains verifyServerCertificate
girarda Feb 15, 2022
c924611
do the same for all ssl params
girarda Feb 15, 2022
c959dd8
delete dead file
girarda Feb 15, 2022
4b7d8fe
slight refactor
girarda Feb 15, 2022
e401db1
new method
girarda Feb 15, 2022
76d8595
remove default value
girarda Feb 15, 2022
4e592ed
error message
girarda Feb 15, 2022
c90021b
rename
girarda Feb 15, 2022
59415f8
Merge branch 'master' into alex/mysql-jdbc-params
girarda Feb 15, 2022
51b13aa
update as per comments
girarda Feb 16, 2022
95e1a83
Update exception message
girarda Feb 16, 2022
607f101
Bump version
girarda Feb 16, 2022
c550028
extract to method
girarda Feb 16, 2022
097906f
Update doc
girarda Feb 16, 2022
0e9e8ea
Revert "Update doc"
girarda Feb 16, 2022
9744ba4
Update doc
girarda Feb 16, 2022
70cd82d
delete dead code
girarda Feb 16, 2022
1c7660e
update doc
girarda Feb 16, 2022
6bfce23
Throw exception with better error message
girarda Feb 16, 2022
e600978
Add missing test
girarda Feb 16, 2022
4f9b1de
Use MoreMaps::merge
girarda Feb 16, 2022
510c954
Add missing tests
girarda Feb 16, 2022
c53e6c8
camel case
girarda Feb 16, 2022
c1eccd4
Allow colliding parameters if values are equal
girarda Feb 16, 2022
e9e2d7a
Remove trailing &
girarda Feb 16, 2022
80352b6
Throw IllegalArgumentException
girarda Feb 17, 2022
d3a1263
extract to constants
girarda Feb 17, 2022
449e2cf
Bump version in seed
girarda Feb 17, 2022
5800b6d
Update destination specs
girarda Feb 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.16
LABEL io.airbyte.version=0.1.17
LABEL io.airbyte.name=airbyte/destination-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.Destination;
Expand All @@ -16,8 +18,13 @@
import io.airbyte.integrations.destination.mysql.MySQLSqlOperations.VersionCompatibility;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,6 +36,17 @@ public class MySQLDestination extends AbstractJdbcDestination implements Destina

public static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";

public static final String JDBC_URL_PARAMS_KEY = "jdbc_url_params";

static final Map<String, String> SSL_JDBC_PARAMETERS = ImmutableMap.of(
"useSSL", "true",
"requireSSL", "true",
"verifyServerCertificate", "false"
);
static final Map<String, String> DEFAULT_JDBC_PARAMETERS = ImmutableMap.of(
"zeroDateTimeBehavior", "convertToNull"
);

public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new MySQLDestination(), HOST_KEY, PORT_KEY);
}
Expand Down Expand Up @@ -78,13 +96,7 @@ protected JdbcDatabase getDatabase(final JsonNode config) {

@Override
public JsonNode toJdbcConfig(final JsonNode config) {
final List<String> additionalParameters = new ArrayList<>();

if (!config.has("ssl") || config.get("ssl").asBoolean()) {
additionalParameters.add("useSSL=true");
additionalParameters.add("requireSSL=true");
additionalParameters.add("verifyServerCertificate=false");
}
final List<String> additionalParameters = getAdditionalParameters(config);

final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s",
config.get("host").asText(),
Expand All @@ -95,10 +107,9 @@ public JsonNode toJdbcConfig(final JsonNode config) {
// and can't
// remove zero date values.
// since zero dates are placeholders, we convert them to null by default
jdbcUrl.append("?zeroDateTimeBehavior=convertToNull");
if (!additionalParameters.isEmpty()) {
jdbcUrl.append("&");
additionalParameters.forEach(x -> jdbcUrl.append(x).append("&"));
jdbcUrl.append("?");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

append "?" instead of "&" since this zeroDateTimeBehavior=convertToNull is already included in additionalParameters

jdbcUrl.append(String.join("&", additionalParameters));
}

final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
Expand All @@ -112,6 +123,63 @@ public JsonNode toJdbcConfig(final JsonNode config) {
return Jsons.jsonNode(configBuilder.build());
}

private List<String> getAdditionalParameters(final JsonNode config) {
final Map<String, String> customParameters = getCustomJdbcParameters(config);

if (useSSL(config)) {
return convertToJdbcStrings(customParameters, MoreMaps.merge(DEFAULT_JDBC_PARAMETERS, SSL_JDBC_PARAMETERS));
} else {
return convertToJdbcStrings(customParameters, DEFAULT_JDBC_PARAMETERS);
}
}

private List<String> convertToJdbcStrings(final Map<String, String> customParameters, final Map<String, String> defaultParametersMap) {
assertCustomParametersDontOverwriteDefaultParameters(customParameters, defaultParametersMap);
return Streams.concat(Stream.of(customParameters, defaultParametersMap))
.map(Map::entrySet)
.flatMap(Collection::stream)
.map(entry -> formatParameter(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
}

private void assertCustomParametersDontOverwriteDefaultParameters(final Map<String, String> customParameters,
final Map<String, String> defaultParameters) {
for (final String key : defaultParameters.keySet()) {
if (customParameters.containsKey(key) && !Objects.equals(customParameters.get(key), defaultParameters.get(key))) {
throw new RuntimeException("Cannot overwrite default JDBC parameter " + key);
}
}
}

private Map<String, String> getCustomJdbcParameters(final JsonNode config) {
final Map<String, String> parameters = new HashMap<>();
if (config.has(JDBC_URL_PARAMS_KEY)) {
final String jdbcParams = config.get(JDBC_URL_PARAMS_KEY).asText();
if (!jdbcParams.isBlank()) {
final String[] keyValuePairs = jdbcParams.split("&");
for (final String kv : keyValuePairs) {
final String[] split = kv.split("=");
if (split.length == 2) {
parameters.put(split[0], split[1]);
} else {
throw new RuntimeException(
"jdbc_url_params must be formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). Got "
+ jdbcParams);
}
}
}
}
return parameters;
}

private boolean useSSL(final JsonNode config) {
return !config.has("ssl") || config.get("ssl").asBoolean();
}

static String formatParameter(final String key, final String value) {
return String.format("%s=%s", key, value);
}

public static void main(final String[] args) throws Exception {
final Destination destination = MySQLDestination.sshWrappedDestination();
LOGGER.info("starting destination: {}", MySQLDestination.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@
"supportsIncremental": true,
"supportsNormalization": true,
"supportsDBT": true,
"supported_destination_sync_modes": ["overwrite", "append"],
"supported_destination_sync_modes": [
"overwrite",
"append"
],
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MySQL Destination Spec",
"type": "object",
"required": ["host", "port", "username", "database"],
"required": [
"host",
"port",
"username",
"database"
],
"additionalProperties": true,
"properties": {
"host": {
Expand All @@ -24,7 +32,9 @@
"minimum": 0,
"maximum": 65536,
"default": 3306,
"examples": ["3306"],
"examples": [
"3306"
],
"order": 1
},
"database": {
Expand Down Expand Up @@ -52,6 +62,12 @@
"type": "boolean",
"default": true,
"order": 5
},
"jdbc_url_params": {
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"title": "JDBC URL Params",
"type": "string",
"order": 6
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package io.airbyte.integrations.destination.mysql;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.spy;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import java.util.Map;
import java.util.Map.Entry;
import org.junit.jupiter.api.Test;

public class MySQLDestinationTest {

private MySQLDestination getDestination() {
final MySQLDestination result = spy(MySQLDestination.class);
return result;
}

private JsonNode buildConfigNoJdbcParameters() {
final JsonNode config = Jsons.jsonNode(ImmutableMap.of(
"host", "localhost",
"port", 1337,
"username", "user",
"database", "db"
));
return config;
}

private JsonNode buildConfigWithExtraJdbcParameters(final String extraParam) {
final JsonNode config = Jsons.jsonNode(ImmutableMap.of(
"host", "localhost",
"port", 1337,
"username", "user",
"database", "db",
"jdbc_url_params", extraParam
));
return config;
}

private JsonNode buildConfigWithExtraJdbcParametersWithNoSsl(final String extraParam) {
final JsonNode config = Jsons.jsonNode(ImmutableMap.of(
"host", "localhost",
"port", 1337,
"username", "user",
"database", "db",
"ssl", false,
"jdbc_url_params", extraParam
));
return config;
}

private JsonNode buildConfigNoExtraJdbcParametersWithoutSsl() {
final JsonNode config = Jsons.jsonNode(ImmutableMap.of(
"host", "localhost",
"port", 1337,
"username", "user",
"database", "db",
"ssl", false
));
return config;
}

@Test
void testNoExtraParams() {
final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoJdbcParameters());
final String url = jdbcConfig.get("jdbc_url").asText();
assertEquals("jdbc:mysql://localhost:1337/db?verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true", url);
}

@Test
void testEmptyExtraParams() {
final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(""));
final String url = jdbcConfig.get("jdbc_url").asText();
assertEquals("jdbc:mysql://localhost:1337/db?verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true", url);
}

@Test
void testExtraParams() {
final String extraParam = "key1=value1&key2=value2&key3=value3";
final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(extraParam));
final String url = jdbcConfig.get("jdbc_url").asText();
assertEquals(
"jdbc:mysql://localhost:1337/db?key1=value1&key2=value2&key3=value3&verifyServerCertificate=false&zeroDateTimeBehavior=convertToNull&requireSSL=true&useSSL=true",
url);
}

@Test
void testExtraParamsWithDefaultParameter() {
final Map<String, String> allDefaultParameters = MoreMaps.merge(MySQLDestination.SSL_JDBC_PARAMETERS,
MySQLDestination.DEFAULT_JDBC_PARAMETERS);
for (final Entry<String, String> entry : allDefaultParameters.entrySet()) {
final String identicalParameter = MySQLDestination.formatParameter(entry.getKey(), entry.getValue());
final String overridingParameter = MySQLDestination.formatParameter(entry.getKey(), "DIFFERENT_VALUE");

// Do not throw an exception if the values are equal
assertDoesNotThrow(() ->
getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(identicalParameter)).get("jdbc_url").asText()
);
// Throw an exception if the values are different
assertThrows(RuntimeException.class, () ->
getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(overridingParameter))
);
}
}

@Test
void testExtraParameterNoSsl() {
final String extraParam = "key1=value1&key2=value2&key3=value3";
final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJdbcParametersWithNoSsl(extraParam));
final String url = jdbcConfig.get("jdbc_url").asText();
assertEquals(
"jdbc:mysql://localhost:1337/db?key1=value1&key2=value2&key3=value3&zeroDateTimeBehavior=convertToNull",
url);
}

@Test
void testNoExtraParameterNoSsl() {
final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoExtraJdbcParametersWithoutSsl());
final String url = jdbcConfig.get("jdbc_url").asText();
assertEquals(
"jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull",
url);
}

@Test
void testInvalidExtraParam() {
final String extraParam = "key1=value1&sdf&";
assertThrows(RuntimeException.class, () -> {
getDestination().toJdbcConfig(buildConfigWithExtraJdbcParameters(extraParam));
});
}
}
11 changes: 11 additions & 0 deletions docs/integrations/destinations/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ You should now have all the requirements needed to configure MySQL as a destinat
* **Username**
* **Password**
* **Database**
* **jdbc_url_params** (Optional)

### Default JDBC URL Parameters

The following JDBC URL parameters are set by Airbyte and cannot be overridden by the `jdbc_url_params` field:

* `useSSL=true` (unless `ssl` is set to false)
* `requireSSL=true` (unless `ssl` is set to false)
* `verifyServerCertificate=false` (unless `ssl` is set to false)
* `zeroDateTimeBehavior=convertToNull`

## Known Limitations

Expand Down Expand Up @@ -95,6 +105,7 @@ Using this feature requires additional configuration, when creating the destinat

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- | :--- |
| 0.1.17 | 2022-02-16 | [10362](https://github.com/airbytehq/airbyte/pull/10362) | Add jdbc_url_params support for optional JDBC parameters |
| 0.1.16 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.1.15 | 2021-12-01 | [8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key |
| 0.1.14 | 2021-11-08 | [#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count |
Expand Down