Skip to content

Allow sessionvariables jdbc url param in source-mysql #25859

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 23, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -410,14 +411,18 @@ protected long getActualCursorRecordCount(final Connection connection,

@Override
public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException {
return createDatabase(sourceConfig, JdbcDataSourceUtils::getConnectionProperties);
}

protected JdbcDatabase createDatabase(final JsonNode sourceConfig, Function<JsonNode, Map<String, String>> connectionProperties) throws SQLException {
final JsonNode jdbcConfig = toDatabaseConfig(sourceConfig);
// Create the data source
final DataSource dataSource = DataSourceFactory.create(
jdbcConfig.has(JdbcUtils.USERNAME_KEY) ? jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText() : null,
jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null,
driverClass,
jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(),
JdbcDataSourceUtils.getConnectionProperties(sourceConfig));
connectionProperties.apply(sourceConfig));
// Record the data source so that it can be closed.
dataSources.add(dataSource);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static Map<String, String> getConnectionProperties(final JsonNode config)
* @param config A configuration used to check Jdbc connection
* @return A mapping of the default connection properties
*/
private static Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
public static Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
// NOTE that Postgres returns an empty map for some reason?
return JdbcUtils.parseJdbcParameters(config, "connection_properties", DEFAULT_JDBC_PARAMETERS_DELIMITER);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils.DEFAULT_JDBC_PARAMETERS_DELIMITER;
import static io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils.assertCustomParametersDontOverwriteDefaultParameters;
import static io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SSL_MODE;
import static java.util.stream.Collectors.toList;

Expand All @@ -21,6 +23,7 @@
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.factory.DatabaseDriver;
Expand All @@ -35,6 +38,7 @@
import io.airbyte.integrations.debezium.internals.mysql.MySqlCdcPosition;
import io.airbyte.integrations.debezium.internals.mysql.MySqlCdcTargetPosition;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
import io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper;
Expand All @@ -55,6 +59,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -382,6 +387,40 @@ protected static String toSslJdbcParamInternal(final SslMode sslMode) {
return result;
}

@Override
public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException {
return super.createDatabase(sourceConfig, this::getConnectionProperties);
}

public Map<String, String> getConnectionProperties(final JsonNode config) {
final Map<String, String> customProperties =
config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)
? parseJdbcParameters(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText(), DEFAULT_JDBC_PARAMETERS_DELIMITER) : new HashMap<>();
final Map<String, String> defaultProperties = JdbcDataSourceUtils.getDefaultConnectionProperties(config);
assertCustomParametersDontOverwriteDefaultParameters(customProperties, defaultProperties);
return MoreMaps.merge(customProperties, defaultProperties);
}

public static Map<String, String> parseJdbcParameters(final String jdbcPropertiesString, final String delimiter) {
final Map<String, String> parameters = new HashMap<>();
if (!jdbcPropertiesString.isBlank()) {
final String[] keyValuePairs = jdbcPropertiesString.split(delimiter);
for (final String kv : keyValuePairs) {
final String[] split = kv.split("=");
if (split.length == 2) {
parameters.put(split[0], split[1]);
} else if (split.length == 3 && kv.contains("sessionVariables")) {
parameters.put(split[0], split[1] + "=" + split[2]);
} else {
throw new IllegalArgumentException(
"jdbc_url_params must be formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). Got "
+ jdbcPropertiesString);
}
}
}
return parameters;
}

public static void main(final String[] args) throws Exception {
final Source source = MySqlSource.sshWrappedSource();
LOGGER.info("starting source: {}", MySqlSource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.jooq.DSLContext;
Expand Down Expand Up @@ -197,4 +198,13 @@ CREATE VIEW test_view_null_cursor(id) as

}


@Test
void testParseJdbcParameters() {
Map<String, String> parameters = MySqlSource.parseJdbcParameters("theAnswerToLiveAndEverything=42&sessionVariables=max_execution_time=10000&foo=bar", "&");
assertEquals("max_execution_time=10000", parameters.get("sessionVariables"));
assertEquals("42", parameters.get("theAnswerToLiveAndEverything"));
assertEquals("bar", parameters.get("foo"));
}

}