Skip to content

Allow sessionvariables jdbc url param in source-mysql #25859

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 23, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Maps;
import io.airbyte.commons.exceptions.ConfigErrorException;
import java.sql.JDBCType;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -108,7 +109,7 @@ public static Map<String, String> parseJdbcParameters(final String jdbcPropertie
if (split.length == 2) {
parameters.put(split[0], split[1]);
} else {
throw new IllegalArgumentException(
throw new ConfigErrorException(
"jdbc_url_params must be formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). Got "
+ jdbcPropertiesString);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static Map<String, String> getConnectionProperties(final JsonNode config)
* @param config A configuration used to check Jdbc connection
* @return A mapping of the default connection properties
*/
private static Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
public static Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
// NOTE that Postgres returns an empty map for some reason?
return JdbcUtils.parseJdbcParameters(config, "connection_properties", DEFAULT_JDBC_PARAMETERS_DELIMITER);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils.DEFAULT_JDBC_PARAMETERS_DELIMITER;
import static io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils.assertCustomParametersDontOverwriteDefaultParameters;
import static io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SSL_MODE;
import static java.util.stream.Collectors.toList;

Expand All @@ -17,15 +19,19 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.mysql.cj.MysqlType;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.jdbc.StreamingJdbcDatabase;
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
Expand All @@ -35,6 +41,7 @@
import io.airbyte.integrations.debezium.internals.mysql.MySqlCdcPosition;
import io.airbyte.integrations.debezium.internals.mysql.MySqlCdcTargetPosition;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils;
import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
import io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper;
Expand All @@ -55,13 +62,15 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -382,6 +391,60 @@ protected static String toSslJdbcParamInternal(final SslMode sslMode) {
return result;
}

@Override
public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException {
// return super.createDatabase(sourceConfig, this::getConnectionProperties);
final JsonNode jdbcConfig = toDatabaseConfig(sourceConfig);
// Create the data source
final DataSource dataSource = DataSourceFactory.create(
jdbcConfig.has(JdbcUtils.USERNAME_KEY) ? jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText() : null,
jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null,
driverClass,
jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(),
this.getConnectionProperties(sourceConfig));
// Record the data source so that it can be closed.
dataSources.add(dataSource);

final JdbcDatabase database = new StreamingJdbcDatabase(
dataSource,
sourceOperations,
streamingQueryConfigProvider);

quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString);
database.setSourceConfig(sourceConfig);
database.setDatabaseConfig(jdbcConfig);
return database;
}

public Map<String, String> getConnectionProperties(final JsonNode config) {
final Map<String, String> customProperties =
config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)
? parseJdbcParameters(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText(), DEFAULT_JDBC_PARAMETERS_DELIMITER) : new HashMap<>();
final Map<String, String> defaultProperties = JdbcDataSourceUtils.getDefaultConnectionProperties(config);
assertCustomParametersDontOverwriteDefaultParameters(customProperties, defaultProperties);
return MoreMaps.merge(customProperties, defaultProperties);
}

public static Map<String, String> parseJdbcParameters(final String jdbcPropertiesString, final String delimiter) {
final Map<String, String> parameters = new HashMap<>();
if (!jdbcPropertiesString.isBlank()) {
final String[] keyValuePairs = jdbcPropertiesString.split(delimiter);
for (final String kv : keyValuePairs) {
final String[] split = kv.split("=");
if (split.length == 2) {
parameters.put(split[0], split[1]);
} else if (split.length == 3 && kv.contains("sessionVariables")) {
parameters.put(split[0], split[1] + "=" + split[2]);
} else {
throw new ConfigErrorException(
"jdbc_url_params must be formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). Got "
+ jdbcPropertiesString);
}
}
}
return parameters;
}

public static void main(final String[] args) throws Exception {
final Source source = MySqlSource.sshWrappedSource();
LOGGER.info("starting source: {}", MySqlSource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -194,4 +195,38 @@ CREATE VIEW test_view_null_cursor(id) as

}


@Test
void testParseJdbcParameters() {
Map<String, String> parameters = MySqlSource.parseJdbcParameters("theAnswerToLiveAndEverything=42&sessionVariables=max_execution_time=10000&foo=bar", "&");
assertEquals("max_execution_time=10000", parameters.get("sessionVariables"));
assertEquals("42", parameters.get("theAnswerToLiveAndEverything"));
assertEquals("bar", parameters.get("foo"));
}

@Test
public void testJDBCSessionVariable() throws Exception {
// start DB
try (final MySQLContainer<?> container = new MySQLContainer<>("mysql:8.0")
.withUsername(TEST_USER)
.withPassword(TEST_PASSWORD)
.withEnv("MYSQL_ROOT_HOST", "%")
.withEnv("MYSQL_ROOT_PASSWORD", TEST_PASSWORD)
.withLogConsumer(new Slf4jLogConsumer(LOGGER))) {

container.start();
final Properties properties = new Properties();
properties.putAll(ImmutableMap.of("user", "root", JdbcUtils.PASSWORD_KEY, TEST_PASSWORD));
DriverManager.getConnection(container.getJdbcUrl(), properties);
final String dbName = Strings.addRandomSuffix("db", "_", 10);
final JsonNode config = getConfig(container, dbName, "sessionVariables=MAX_EXECUTION_TIME=28800000");

try (final Connection connection = DriverManager.getConnection(container.getJdbcUrl(), properties)) {
connection.createStatement().execute("GRANT ALL PRIVILEGES ON *.* TO '" + TEST_USER + "'@'%';\n");
connection.createStatement().execute("CREATE DATABASE " + config.get(JdbcUtils.DATABASE_KEY).asText());
}
final AirbyteConnectionStatus check = new MySqlSource().check(config);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, check.getStatus());
}
}
}