Skip to content

🎉 Destination MySQL: Add jdbc_url_params support for optional JDBC parameters #10362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4d751f4
pass through jdbc params
girarda Feb 15, 2022
9b6c5bd
fail if contains verifyServerCertificate
girarda Feb 15, 2022
c924611
do the same for all ssl params
girarda Feb 15, 2022
c959dd8
delete dead file
girarda Feb 15, 2022
4b7d8fe
slight refactor
girarda Feb 15, 2022
e401db1
new method
girarda Feb 15, 2022
76d8595
remove default value
girarda Feb 15, 2022
4e592ed
error message
girarda Feb 15, 2022
c90021b
rename
girarda Feb 15, 2022
59415f8
Merge branch 'master' into alex/mysql-jdbc-params
girarda Feb 15, 2022
51b13aa
update as per comments
girarda Feb 16, 2022
95e1a83
Update exception message
girarda Feb 16, 2022
607f101
Bump version
girarda Feb 16, 2022
c550028
extract to method
girarda Feb 16, 2022
097906f
Update doc
girarda Feb 16, 2022
0e9e8ea
Revert "Update doc"
girarda Feb 16, 2022
9744ba4
Update doc
girarda Feb 16, 2022
70cd82d
delete dead code
girarda Feb 16, 2022
1c7660e
update doc
girarda Feb 16, 2022
6bfce23
Throw exception with better error message
girarda Feb 16, 2022
e600978
Add missing test
girarda Feb 16, 2022
4f9b1de
Use MoreMaps::merge
girarda Feb 16, 2022
510c954
Add missing tests
girarda Feb 16, 2022
c53e6c8
camel case
girarda Feb 16, 2022
c1eccd4
Allow colliding parameters if values are equal
girarda Feb 16, 2022
e9e2d7a
Remove trailing &
girarda Feb 16, 2022
80352b6
Throw IllegalArgumentException
girarda Feb 17, 2022
d3a1263
extract to constants
girarda Feb 17, 2022
449e2cf
Bump version in seed
girarda Feb 17, 2022
5800b6d
Update destination specs
girarda Feb 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@
- name: MySQL
destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
dockerRepository: airbyte/destination-mysql
dockerImageTag: 0.1.16
dockerImageTag: 0.1.17
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql
icon: mysql.svg
- name: Oracle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2463,7 +2463,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-mysql:0.1.16"
- dockerImage: "airbyte/destination-mysql:0.1.17"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/mysql"
connectionSpecification:
Expand Down Expand Up @@ -2514,6 +2514,13 @@
type: "boolean"
default: true
order: 5
jdbc_url_params:
description: "Additional properties to pass to the JDBC URL string when\
\ connecting to the database formatted as 'key=value' pairs separated\
\ by the symbol '&'. (example: key1=value1&key2=value2&key3=value3)."
title: "JDBC URL Params"
type: "string"
order: 6
tunnel_method:
type: "object"
title: "SSH Tunnel Method"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.16
LABEL io.airbyte.version=0.1.17
LABEL io.airbyte.name=airbyte/destination-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.Destination;
Expand All @@ -16,29 +18,50 @@
import io.airbyte.integrations.destination.mysql.MySQLSqlOperations.VersionCompatibility;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySQLDestination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDestination.class);
public static final List<String> HOST_KEY = List.of("host");
public static final List<String> PORT_KEY = List.of("port");

public static final String DATABASE_KEY = "database";
public static final String HOST_KEY = "host";
public static final String JDBC_URL_KEY = "jdbc_url";
public static final String JDBC_URL_PARAMS_KEY = "jdbc_url_params";
public static final String PASSWORD_KEY = "password";
public static final String PORT_KEY = "port";
public static final String SSL_KEY = "ssl";
public static final String USERNAME_KEY = "username";

public static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";

static final Map<String, String> SSL_JDBC_PARAMETERS = ImmutableMap.of(
"useSSL", "true",
"requireSSL", "true",
"verifyServerCertificate", "false"
);
static final Map<String, String> DEFAULT_JDBC_PARAMETERS = ImmutableMap.of(
"zeroDateTimeBehavior", "convertToNull"
);

public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new MySQLDestination(), HOST_KEY, PORT_KEY);
return new SshWrappedDestination(new MySQLDestination(), List.of(HOST_KEY), List.of(PORT_KEY));
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) {
try (final JdbcDatabase database = getDatabase(config)) {
final MySQLSqlOperations mySQLSqlOperations = (MySQLSqlOperations) getSqlOperations();

final String outputSchema = getNamingResolver().getIdentifier(config.get("database").asText());
final String outputSchema = getNamingResolver().getIdentifier(config.get(DATABASE_KEY).asText());
attemptSQLCreateAndDropTableOperations(outputSchema, database, getNamingResolver(),
mySQLSqlOperations);

Expand Down Expand Up @@ -69,49 +92,99 @@ protected JdbcDatabase getDatabase(final JsonNode config) {
final JsonNode jdbcConfig = toJdbcConfig(config);

return Databases.createJdbcDatabase(
jdbcConfig.get("username").asText(),
jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null,
jdbcConfig.get("jdbc_url").asText(),
jdbcConfig.get(USERNAME_KEY).asText(),
jdbcConfig.has(PASSWORD_KEY) ? jdbcConfig.get(PASSWORD_KEY).asText() : null,
jdbcConfig.get(JDBC_URL_KEY).asText(),
getDriverClass(),
"allowLoadLocalInfile=true");
}

@Override
public JsonNode toJdbcConfig(final JsonNode config) {
final List<String> additionalParameters = new ArrayList<>();

if (!config.has("ssl") || config.get("ssl").asBoolean()) {
additionalParameters.add("useSSL=true");
additionalParameters.add("requireSSL=true");
additionalParameters.add("verifyServerCertificate=false");
}
final List<String> additionalParameters = getAdditionalParameters(config);

final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()));
config.get(HOST_KEY).asText(),
config.get(PORT_KEY).asText(),
config.get(DATABASE_KEY).asText()));
// zero dates by default cannot be parsed into java date objects (they will throw an error)
// in addition, users don't always have agency in fixing them e.g: maybe they don't own the database
// and can't
// remove zero date values.
// since zero dates are placeholders, we convert them to null by default
jdbcUrl.append("?zeroDateTimeBehavior=convertToNull");
if (!additionalParameters.isEmpty()) {
jdbcUrl.append("&");
additionalParameters.forEach(x -> jdbcUrl.append(x).append("&"));
jdbcUrl.append("?");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

append "?" instead of "&" since this zeroDateTimeBehavior=convertToNull is already included in additionalParameters

jdbcUrl.append(String.join("&", additionalParameters));
}

final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put("username", config.get("username").asText())
.put("jdbc_url", jdbcUrl.toString());
.put(USERNAME_KEY, config.get(USERNAME_KEY).asText())
.put(JDBC_URL_KEY, jdbcUrl.toString());

if (config.has("password")) {
configBuilder.put("password", config.get("password").asText());
if (config.has(PASSWORD_KEY)) {
configBuilder.put(PASSWORD_KEY, config.get(PASSWORD_KEY).asText());
}

return Jsons.jsonNode(configBuilder.build());
}

private List<String> getAdditionalParameters(final JsonNode config) {
final Map<String, String> customParameters = getCustomJdbcParameters(config);

if (useSSL(config)) {
return convertToJdbcStrings(customParameters, MoreMaps.merge(DEFAULT_JDBC_PARAMETERS, SSL_JDBC_PARAMETERS));
} else {
return convertToJdbcStrings(customParameters, DEFAULT_JDBC_PARAMETERS);
}
}

private List<String> convertToJdbcStrings(final Map<String, String> customParameters, final Map<String, String> defaultParametersMap) {
assertCustomParametersDontOverwriteDefaultParameters(customParameters, defaultParametersMap);
return Streams.concat(Stream.of(customParameters, defaultParametersMap))
.map(Map::entrySet)
.flatMap(Collection::stream)
.map(entry -> formatParameter(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
}

private void assertCustomParametersDontOverwriteDefaultParameters(final Map<String, String> customParameters,
final Map<String, String> defaultParameters) {
for (final String key : defaultParameters.keySet()) {
if (customParameters.containsKey(key) && !Objects.equals(customParameters.get(key), defaultParameters.get(key))) {
throw new IllegalArgumentException("Cannot overwrite default JDBC parameter " + key);
}
}
}

private Map<String, String> getCustomJdbcParameters(final JsonNode config) {
final Map<String, String> parameters = new HashMap<>();
if (config.has(JDBC_URL_PARAMS_KEY)) {
final String jdbcParams = config.get(JDBC_URL_PARAMS_KEY).asText();
if (!jdbcParams.isBlank()) {
final String[] keyValuePairs = jdbcParams.split("&");
for (final String kv : keyValuePairs) {
final String[] split = kv.split("=");
if (split.length == 2) {
parameters.put(split[0], split[1]);
} else {
throw new IllegalArgumentException(
"jdbc_url_params must be formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). Got "
+ jdbcParams);
}
}
}
}
return parameters;
}

private boolean useSSL(final JsonNode config) {
return !config.has(SSL_KEY) || config.get(SSL_KEY).asBoolean();
}

static String formatParameter(final String key, final String value) {
return String.format("%s=%s", key, value);
}

public static void main(final String[] args) throws Exception {
final Destination destination = MySQLDestination.sshWrappedDestination();
LOGGER.info("starting destination: {}", MySQLDestination.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@
"supportsIncremental": true,
"supportsNormalization": true,
"supportsDBT": true,
"supported_destination_sync_modes": ["overwrite", "append"],
"supported_destination_sync_modes": [
"overwrite",
"append"
],
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MySQL Destination Spec",
"type": "object",
"required": ["host", "port", "username", "database"],
"required": [
"host",
"port",
"username",
"database"
],
"additionalProperties": true,
"properties": {
"host": {
Expand All @@ -24,7 +32,9 @@
"minimum": 0,
"maximum": 65536,
"default": 3306,
"examples": ["3306"],
"examples": [
"3306"
],
"order": 1
},
"database": {
Expand Down Expand Up @@ -52,6 +62,12 @@
"type": "boolean",
"default": true,
"order": 5
},
"jdbc_url_params": {
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"title": "JDBC URL Params",
"type": "string",
"order": 6
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import org.apache.commons.lang3.RandomStringUtils;

/**
* Abstract class that allows us to avoid duplicating testing logic for testing SSH with a key file
* or with a password.
* Abstract class that allows us to avoid duplicating testing logic for testing SSH with a key file or with a password.
*/
public abstract class SshMySQLDestinationAcceptanceTest extends DestinationAcceptanceTest {

private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();
private final List<String> HOST_KEY = List.of(MySQLDestination.HOST_KEY);
private final List<String> PORT_KEY = List.of(MySQLDestination.PORT_KEY);
private String schemaName;

public abstract Path getConfigFilePath();
Expand Down Expand Up @@ -60,9 +61,9 @@ protected JsonNode getFailCheckConfig() {

@Override
protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
final String streamName,
final String namespace,
final JsonNode streamSchema)
final String streamName,
final String namespace,
final JsonNode streamSchema)
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
Expand All @@ -87,8 +88,8 @@ protected boolean implementsNamespaces() {

@Override
protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv env,
final String streamName,
final String namespace)
final String streamName,
final String namespace)
throws Exception {
final var tableName = namingResolver.getIdentifier(streamName);
final String schema = namespace != null ? namingResolver.getIdentifier(namespace) : namingResolver.getIdentifier(schemaName);
Expand Down Expand Up @@ -121,8 +122,8 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
final var schema = schemaName == null ? this.schemaName : schemaName;
return SshTunnel.sshWrap(
getConfig(),
MySQLDestination.HOST_KEY,
MySQLDestination.PORT_KEY,
HOST_KEY,
PORT_KEY,
(CheckedFunction<JsonNode, List<JsonNode>, Exception>) mangledConfig -> getDatabaseFromConfig(mangledConfig)
.query(
ctx -> ctx
Expand All @@ -140,8 +141,8 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception {
final var config = getConfig();
SshTunnel.sshWrap(
config,
MySQLDestination.HOST_KEY,
MySQLDestination.PORT_KEY,
HOST_KEY,
PORT_KEY,
mangledConfig -> {
getDatabaseFromConfig(mangledConfig).query(ctx -> ctx.fetch(String.format("CREATE DATABASE %s;", schemaName)));
});
Expand All @@ -151,8 +152,8 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception {
protected void tearDown(final TestDestinationEnv testEnv) throws Exception {
SshTunnel.sshWrap(
getConfig(),
MySQLDestination.HOST_KEY,
MySQLDestination.PORT_KEY,
HOST_KEY,
PORT_KEY,
mangledConfig -> {
getDatabaseFromConfig(mangledConfig).query(ctx -> ctx.fetch(String.format("DROP DATABASE %s", schemaName)));
});
Expand Down
Loading