|
8 | 8 | import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC;
|
9 | 9 | import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
|
10 | 10 | import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
|
| 11 | +import static io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils.DEFAULT_JDBC_PARAMETERS_DELIMITER; |
| 12 | +import static io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils.assertCustomParametersDontOverwriteDefaultParameters; |
11 | 13 | import static io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SSL_MODE;
|
12 | 14 | import static java.util.stream.Collectors.toList;
|
13 | 15 |
|
|
17 | 19 | import com.google.common.collect.ImmutableMap;
|
18 | 20 | import com.google.common.collect.Lists;
|
19 | 21 | import com.mysql.cj.MysqlType;
|
| 22 | +import io.airbyte.commons.exceptions.ConfigErrorException; |
20 | 23 | import io.airbyte.commons.features.EnvVariableFeatureFlags;
|
21 | 24 | import io.airbyte.commons.features.FeatureFlags;
|
22 | 25 | import io.airbyte.commons.functional.CheckedConsumer;
|
23 | 26 | import io.airbyte.commons.json.Jsons;
|
| 27 | +import io.airbyte.commons.map.MoreMaps; |
24 | 28 | import io.airbyte.commons.util.AutoCloseableIterator;
|
25 | 29 | import io.airbyte.commons.util.AutoCloseableIterators;
|
| 30 | +import io.airbyte.db.factory.DataSourceFactory; |
26 | 31 | import io.airbyte.db.factory.DatabaseDriver;
|
27 | 32 | import io.airbyte.db.jdbc.JdbcDatabase;
|
28 | 33 | import io.airbyte.db.jdbc.JdbcUtils;
|
| 34 | +import io.airbyte.db.jdbc.StreamingJdbcDatabase; |
29 | 35 | import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
|
30 | 36 | import io.airbyte.integrations.base.IntegrationRunner;
|
31 | 37 | import io.airbyte.integrations.base.Source;
|
|
35 | 41 | import io.airbyte.integrations.debezium.internals.mysql.MySqlCdcPosition;
|
36 | 42 | import io.airbyte.integrations.debezium.internals.mysql.MySqlCdcTargetPosition;
|
37 | 43 | import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
|
| 44 | +import io.airbyte.integrations.source.jdbc.JdbcDataSourceUtils; |
38 | 45 | import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils;
|
39 | 46 | import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode;
|
40 | 47 | import io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper;
|
|
55 | 62 | import java.time.Instant;
|
56 | 63 | import java.util.ArrayList;
|
57 | 64 | import java.util.Collections;
|
| 65 | +import java.util.HashMap; |
58 | 66 | import java.util.List;
|
59 | 67 | import java.util.Map;
|
60 | 68 | import java.util.Objects;
|
61 | 69 | import java.util.Optional;
|
62 | 70 | import java.util.Set;
|
63 | 71 | import java.util.function.Supplier;
|
64 | 72 | import java.util.stream.Collectors;
|
| 73 | +import javax.sql.DataSource; |
65 | 74 | import org.slf4j.Logger;
|
66 | 75 | import org.slf4j.LoggerFactory;
|
67 | 76 |
|
@@ -382,6 +391,60 @@ protected static String toSslJdbcParamInternal(final SslMode sslMode) {
|
382 | 391 | return result;
|
383 | 392 | }
|
384 | 393 |
|
| 394 | + @Override |
| 395 | + public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException { |
| 396 | +// return super.createDatabase(sourceConfig, this::getConnectionProperties); |
| 397 | + final JsonNode jdbcConfig = toDatabaseConfig(sourceConfig); |
| 398 | + // Create the data source |
| 399 | + final DataSource dataSource = DataSourceFactory.create( |
| 400 | + jdbcConfig.has(JdbcUtils.USERNAME_KEY) ? jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText() : null, |
| 401 | + jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null, |
| 402 | + driverClass, |
| 403 | + jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(), |
| 404 | + this.getConnectionProperties(sourceConfig)); |
| 405 | + // Record the data source so that it can be closed. |
| 406 | + dataSources.add(dataSource); |
| 407 | + |
| 408 | + final JdbcDatabase database = new StreamingJdbcDatabase( |
| 409 | + dataSource, |
| 410 | + sourceOperations, |
| 411 | + streamingQueryConfigProvider); |
| 412 | + |
| 413 | + quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString); |
| 414 | + database.setSourceConfig(sourceConfig); |
| 415 | + database.setDatabaseConfig(jdbcConfig); |
| 416 | + return database; |
| 417 | + } |
| 418 | + |
| 419 | + public Map<String, String> getConnectionProperties(final JsonNode config) { |
| 420 | + final Map<String, String> customProperties = |
| 421 | + config.has(JdbcUtils.JDBC_URL_PARAMS_KEY) |
| 422 | + ? parseJdbcParameters(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText(), DEFAULT_JDBC_PARAMETERS_DELIMITER) : new HashMap<>(); |
| 423 | + final Map<String, String> defaultProperties = JdbcDataSourceUtils.getDefaultConnectionProperties(config); |
| 424 | + assertCustomParametersDontOverwriteDefaultParameters(customProperties, defaultProperties); |
| 425 | + return MoreMaps.merge(customProperties, defaultProperties); |
| 426 | + } |
| 427 | + |
| 428 | + public static Map<String, String> parseJdbcParameters(final String jdbcPropertiesString, final String delimiter) { |
| 429 | + final Map<String, String> parameters = new HashMap<>(); |
| 430 | + if (!jdbcPropertiesString.isBlank()) { |
| 431 | + final String[] keyValuePairs = jdbcPropertiesString.split(delimiter); |
| 432 | + for (final String kv : keyValuePairs) { |
| 433 | + final String[] split = kv.split("="); |
| 434 | + if (split.length == 2) { |
| 435 | + parameters.put(split[0], split[1]); |
| 436 | + } else if (split.length == 3 && kv.contains("sessionVariables")) { |
| 437 | + parameters.put(split[0], split[1] + "=" + split[2]); |
| 438 | + } else { |
| 439 | + throw new ConfigErrorException( |
| 440 | + "jdbc_url_params must be formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3). Got " |
| 441 | + + jdbcPropertiesString); |
| 442 | + } |
| 443 | + } |
| 444 | + } |
| 445 | + return parameters; |
| 446 | + } |
| 447 | + |
385 | 448 | public static void main(final String[] args) throws Exception {
|
386 | 449 | final Source source = MySqlSource.sshWrappedSource();
|
387 | 450 | LOGGER.info("starting source: {}", MySqlSource.class);
|
|
0 commit comments