Skip to content

🎉 Destination MySQL: Add jdbc_url_params support for optional JDBC parameters #10362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4d751f4
pass through jdbc params
girarda Feb 15, 2022
9b6c5bd
fail if contains verifyServerCertificate
girarda Feb 15, 2022
c924611
do the same for all ssl params
girarda Feb 15, 2022
c959dd8
delete dead file
girarda Feb 15, 2022
4b7d8fe
slight refactor
girarda Feb 15, 2022
e401db1
new method
girarda Feb 15, 2022
76d8595
remove default value
girarda Feb 15, 2022
4e592ed
error message
girarda Feb 15, 2022
c90021b
rename
girarda Feb 15, 2022
59415f8
Merge branch 'master' into alex/mysql-jdbc-params
girarda Feb 15, 2022
51b13aa
update as per comments
girarda Feb 16, 2022
95e1a83
Update exception message
girarda Feb 16, 2022
607f101
Bump version
girarda Feb 16, 2022
c550028
extract to method
girarda Feb 16, 2022
097906f
Update doc
girarda Feb 16, 2022
0e9e8ea
Revert "Update doc"
girarda Feb 16, 2022
9744ba4
Update doc
girarda Feb 16, 2022
70cd82d
delete dead code
girarda Feb 16, 2022
1c7660e
update doc
girarda Feb 16, 2022
6bfce23
Throw exception with better error message
girarda Feb 16, 2022
e600978
Add missing test
girarda Feb 16, 2022
4f9b1de
Use MoreMaps::merge
girarda Feb 16, 2022
510c954
Add missing tests
girarda Feb 16, 2022
c53e6c8
camel case
girarda Feb 16, 2022
c1eccd4
Allow colliding parameters if values are equal
girarda Feb 16, 2022
e9e2d7a
Remove trailing &
girarda Feb 16, 2022
80352b6
Throw IllegalArgumentException
girarda Feb 17, 2022
d3a1263
extract to constants
girarda Feb 17, 2022
449e2cf
Bump version in seed
girarda Feb 17, 2022
5800b6d
Update destination specs
girarda Feb 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
Expand All @@ -18,6 +19,8 @@
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,6 +32,8 @@ public class MySQLDestination extends AbstractJdbcDestination implements Destina

public static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";

public static final String JDBC_URL_PARAMS_KEY = "jdbc_url_params";

public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new MySQLDestination(), HOST_KEY, PORT_KEY);
}
Expand Down Expand Up @@ -78,13 +83,7 @@ protected JdbcDatabase getDatabase(final JsonNode config) {

@Override
public JsonNode toJdbcConfig(final JsonNode config) {
final List<String> additionalParameters = new ArrayList<>();

if (!config.has("ssl") || config.get("ssl").asBoolean()) {
additionalParameters.add("useSSL=true");
additionalParameters.add("requireSSL=true");
additionalParameters.add("verifyServerCertificate=false");
}
final List<String> additionalParameters = getAdditionalParameters(config);

final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s",
config.get("host").asText(),
Expand All @@ -95,6 +94,7 @@ public JsonNode toJdbcConfig(final JsonNode config) {
// and can't
// remove zero date values.
// since zero dates are placeholders, we convert them to null by default
//FIXME(girarda): do we also want to prevent people from overriding zeroDateTimeBehavior?
jdbcUrl.append("?zeroDateTimeBehavior=convertToNull");
if (!additionalParameters.isEmpty()) {
jdbcUrl.append("&");
Expand All @@ -112,6 +112,61 @@ public JsonNode toJdbcConfig(final JsonNode config) {
return Jsons.jsonNode(configBuilder.build());
}

private List<String> getAdditionalParameters(final JsonNode config) {
final List<String> additionalParameters = new ArrayList<>();
addJDBCUrlParameters(config, additionalParameters);
if (useSSL(config)) {
addSSLJDBCParameters(additionalParameters);
}
return additionalParameters;
}

private void addJDBCUrlParameters(final JsonNode config, final List<String> additionalParameters) {
if (config.has(JDBC_URL_PARAMS_KEY)) {
final String additionalParams = config.get(JDBC_URL_PARAMS_KEY).asText();
if (additionalParams.isBlank()) {
return;
}
if (useSSL(config)) {
for (final String p : getSSLParameters().keySet()) {
if (containsParameterKey(additionalParams, p)) {
throw new RuntimeException("Cannot overwrite JDBC parameter " + p);
}
}
}
additionalParameters.add(additionalParams);
}
}

private boolean containsParameterKey(final String additionalParametersString, final String key) {
final String s = String.format("%s=", key);
return additionalParametersString.contains(s);
}

private void addSSLJDBCParameters(final List<String> additionalParameters) {
for (final Entry<String, String> sslParameter : getSSLParameters().entrySet()) {
final String param = formatParameter(sslParameter.getKey(), sslParameter.getValue());
additionalParameters.add(param);
}
}

private boolean useSSL(final JsonNode config) {
return !config.has("ssl") || config.get("ssl").asBoolean();
}

static Map<String, String> getSSLParameters() {
final Builder<String, String> builder = ImmutableMap.builder();
return builder
.put("useSSL", "true")
.put("requireSSL", "true")
.put("verifyServerCertificate", "false")
.build();
}

static String formatParameter(final String key, final String value) {
return String.format("%s=%s", key, value);
}

public static void main(final String[] args) throws Exception {
final Destination destination = MySQLDestination.sshWrappedDestination();
LOGGER.info("starting destination: {}", MySQLDestination.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@
"supportsIncremental": true,
"supportsNormalization": true,
"supportsDBT": true,
"supported_destination_sync_modes": ["overwrite", "append"],
"supported_destination_sync_modes": [
"overwrite",
"append"
],
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MySQL Destination Spec",
"type": "object",
"required": ["host", "port", "username", "database"],
"required": [
"host",
"port",
"username",
"database"
],
"additionalProperties": true,
"properties": {
"host": {
Expand All @@ -24,7 +32,9 @@
"minimum": 0,
"maximum": 65536,
"default": 3306,
"examples": ["3306"],
"examples": [
"3306"
],
"order": 1
},
"database": {
Expand Down Expand Up @@ -52,6 +62,12 @@
"type": "boolean",
"default": true,
"order": 5
},
"jdbc_url_params": {
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"title": "JDBC URL Params",
"type": "string",
"order": 6
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package io.airbyte.integrations.destination.mysql;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.spy;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import java.util.Map.Entry;
import org.junit.jupiter.api.Test;

public class MySQLDestinationTest {

private MySQLDestination getDestination() {
final MySQLDestination result = spy(MySQLDestination.class);
return result;
}

private JsonNode buildConfigNoJDBCParameters() {
final JsonNode config = Jsons.jsonNode(ImmutableMap.of(
"host", "localhost",
"port", 1337,
"username", "user",
"database", "db"
));
return config;
}

private JsonNode buildConfigWithExtraJDBCParameters(final String extraParam) {
final JsonNode config = Jsons.jsonNode(ImmutableMap.of(
"host", "localhost",
"port", 1337,
"username", "user",
"database", "db",
"jdbc_url_params", extraParam
));
return config;
}

private JsonNode buildConfigWithExtraJDBCParametersNoSSL(final String extraParam) {
final JsonNode config = Jsons.jsonNode(ImmutableMap.of(
"host", "localhost",
"port", 1337,
"username", "user",
"database", "db",
"ssl", false,
"jdbc_url_params", extraParam
));
return config;
}

@Test
void testNoExtraParams() {
final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigNoJDBCParameters());
final String url = jdbcConfig.get("jdbc_url").asText();
assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&useSSL=true&requireSSL=true&verifyServerCertificate=false&", url);
}

@Test
void testEmptyExtraParams() {
final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters(""));
final String url = jdbcConfig.get("jdbc_url").asText();
assertEquals("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&useSSL=true&requireSSL=true&verifyServerCertificate=false&", url);
}

@Test
void testExtraParams() {
final String extraParam = "key1=value1&key2=value2&key3=value3";
final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters(extraParam));
final String url = jdbcConfig.get("jdbc_url").asText();
assertEquals(
"jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&key1=value1&key2=value2&key3=value3&useSSL=true&requireSSL=true&verifyServerCertificate=false&",
url);
}

@Test
void testExtraParamsWithSSLParameter() {
for (final Entry<String, String> entry : MySQLDestination.getSSLParameters().entrySet()) {
final String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue());
assertThrows(RuntimeException.class, () ->
getDestination().toJdbcConfig(buildConfigWithExtraJDBCParameters(extraParam))
);
}
}

@Test
void testExtraParamsWithSSLParameterNoSSL() {
for (final Entry<String, String> entry : MySQLDestination.getSSLParameters().entrySet()) {
final String extraParam = MySQLDestination.formatParameter(entry.getKey(), entry.getValue());
final JsonNode jdbcConfig = getDestination().toJdbcConfig(buildConfigWithExtraJDBCParametersNoSSL(extraParam));
final String url = jdbcConfig.get("jdbc_url").asText();
final String expected = String.format("jdbc:mysql://localhost:1337/db?zeroDateTimeBehavior=convertToNull&%s&", extraParam);
assertEquals(expected,
url);
}
}
}