Skip to content

Commit fda6340

Browse files
committed
🐛 source-mysql Support special chars in dbname (#34580)
1 parent d6a0d50 commit fda6340

File tree

13 files changed

+159
-27
lines changed

13 files changed

+159
-27
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ MavenLocal debugging steps:
166166

167167
| Version | Date | Pull Request | Subject |
168168
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
169+
| 0.20.3 | 2024-02-09 | [\#34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in mysql/mssql database name. |
169170
| 0.20.2 | 2024-02-12 | [\#35111](https://github.com/airbytehq/airbyte/pull/35144) | Make state emission from async framework synchronized. |
170171
| 0.20.1 | 2024-02-11 | [\#35111](https://github.com/airbytehq/airbyte/pull/35111) | Fix GlobalAsyncStateManager stats counting logic. |
171172
| 0.20.0 | 2024-02-09 | [\#34562](https://github.com/airbytehq/airbyte/pull/34562) | Add new test cases to BaseTypingDedupingTest to exercise special characters. |
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.20.2
1+
version=0.20.3

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumPropertiesManager.java

+30-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
9+
import io.debezium.spi.common.ReplacementFunction;
910
import java.util.Optional;
1011
import java.util.Properties;
1112

@@ -76,15 +77,42 @@ public Properties getDebeziumProperties(
7677
props.setProperty("max.queue.size.in.bytes", BYTE_VALUE_256_MB);
7778

7879
// WARNING : Never change the value of this otherwise all the connectors would start syncing from
79-
// scratch
80-
props.setProperty(TOPIC_PREFIX_KEY, getName(config));
80+
// scratch.
81+
props.setProperty(TOPIC_PREFIX_KEY, sanitizeTopicPrefix(getName(config)));
8182

8283
// includes
8384
props.putAll(getIncludeConfiguration(catalog, config));
8485

8586
return props;
8687
}
8788

89+
public static String sanitizeTopicPrefix(final String topicName) {
90+
StringBuilder sanitizedNameBuilder = new StringBuilder(topicName.length());
91+
boolean changed = false;
92+
93+
for (int i = 0; i < topicName.length(); ++i) {
94+
char c = topicName.charAt(i);
95+
if (isValidCharacter(c)) {
96+
sanitizedNameBuilder.append(c);
97+
} else {
98+
sanitizedNameBuilder.append(ReplacementFunction.UNDERSCORE_REPLACEMENT.replace(c));
99+
changed = true;
100+
}
101+
}
102+
103+
if (changed) {
104+
return sanitizedNameBuilder.toString();
105+
} else {
106+
return topicName;
107+
}
108+
}
109+
110+
// We need to keep the validation rule the same as debezium engine, which is defined here:
111+
// https://github.com/debezium/debezium/blob/c51ef3099a688efb41204702d3aa6d4722bb4825/debezium-core/src/main/java/io/debezium/schema/AbstractTopicNamingStrategy.java#L178
112+
private static boolean isValidCharacter(char c) {
113+
return c == '.' || c == '_' || c == '-' || c >= 'A' && c <= 'Z' || c >= 'a' && c <= 'z' || c >= '0' && c <= '9';
114+
}
115+
88116
protected abstract Properties getConnectionConfiguration(final JsonNode config);
89117

90118
protected abstract String getName(final JsonNode config);

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/integrations/debezium/CdcSourceTest.java

+16-5
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,19 @@ protected void writeRecords(
219219
recordJson.get(modelCol).asText());
220220
}
221221

222+
protected void deleteMessageOnIdCol(final String streamName, final String idCol, final int idValue) {
223+
testdb.with("DELETE FROM %s.%s WHERE %s = %s", modelsSchema(), streamName, idCol, idValue);
224+
}
225+
226+
protected void deleteCommand(final String streamName) {
227+
testdb.with("DELETE FROM %s.%s", modelsSchema(), streamName);
228+
}
229+
230+
protected void updateCommand(final String streamName, final String modelCol, final String modelVal, final String idCol, final int idValue) {
231+
testdb.with("UPDATE %s.%s SET %s = '%s' WHERE %s = %s", modelsSchema(), streamName,
232+
modelCol, modelVal, COL_ID, 11);
233+
}
234+
222235
static protected Set<AirbyteRecordMessage> removeDuplicates(final Set<AirbyteRecordMessage> messages) {
223236
final Set<JsonNode> existingDataRecordsWithoutUpdated = new HashSet<>();
224237
final Set<AirbyteRecordMessage> output = new HashSet<>();
@@ -346,7 +359,7 @@ void testDelete() throws Exception {
346359
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
347360
assertExpectedStateMessages(stateMessages1);
348361

349-
testdb.with("DELETE FROM %s.%s WHERE %s = %s", modelsSchema(), MODELS_STREAM_NAME, COL_ID, 11);
362+
deleteMessageOnIdCol(MODELS_STREAM_NAME, COL_ID, 11);
350363

351364
final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateMessages1.get(stateMessages1.size() - 1)));
352365
final AutoCloseableIterator<AirbyteMessage> read2 = source()
@@ -375,8 +388,7 @@ void testUpdate() throws Exception {
375388
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
376389
assertExpectedStateMessages(stateMessages1);
377390

378-
testdb.with("UPDATE %s.%s SET %s = '%s' WHERE %s = %s", modelsSchema(), MODELS_STREAM_NAME,
379-
COL_MODEL, updatedModel, COL_ID, 11);
391+
updateCommand(MODELS_STREAM_NAME, COL_MODEL, updatedModel, COL_ID, 11);
380392

381393
final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateMessages1.get(stateMessages1.size() - 1)));
382394
final AutoCloseableIterator<AirbyteMessage> read2 = source()
@@ -536,8 +548,7 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
536548
@DisplayName("When no records exist, no records are returned.")
537549
void testNoData() throws Exception {
538550

539-
testdb.with("DELETE FROM %s.%s", modelsSchema(), MODELS_STREAM_NAME);
540-
551+
deleteCommand(MODELS_STREAM_NAME);
541552
final AutoCloseableIterator<AirbyteMessage> read = source().read(config(), getConfiguredCatalog(), null);
542553
final List<AirbyteMessage> actualRecords = AutoCloseableIterators.toListAndClose(read);
543554

airbyte-integrations/connectors/source-mysql/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
}
77

88
airbyteJavaConnector {
9-
cdkVersionRequired = '0.19.0'
9+
cdkVersionRequired = '0.20.3'
1010
features = ['db-sources']
1111
useLocalCdk = false
1212
}

airbyte-integrations/connectors/source-mysql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.3.4
12+
dockerImageTag: 3.3.5
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/cdc/MySqlDebeziumStateUtil.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,11 @@ public JsonNode constructInitialDebeziumState(final Properties properties,
237237
// We use the schema_only_recovery property cause using this mode will instruct Debezium to
238238
// construct the db schema history.
239239
properties.setProperty("snapshot.mode", "schema_only_recovery");
240+
final String dbName = database.getSourceConfig().get(JdbcUtils.DATABASE_KEY).asText();
241+
// Topic.prefix is sanitized version of database name. At this stage properties does not have this
242+
// value - it's set in RelationalDbDebeziumPropertiesManager.
240243
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(
241-
constructBinlogOffset(database, database.getSourceConfig().get(JdbcUtils.DATABASE_KEY).asText()),
244+
constructBinlogOffset(database, dbName, DebeziumPropertiesManager.sanitizeTopicPrefix(dbName)),
242245
Optional.empty());
243246
final AirbyteSchemaHistoryStorage schemaHistoryStorage =
244247
AirbyteSchemaHistoryStorage.initializeDBHistory(new SchemaHistory<>(Optional.empty(), false), COMPRESSION_ENABLED);
@@ -303,13 +306,13 @@ public static JsonNode serialize(final Map<String, String> offset, final SchemaH
303306
* Method to construct initial Debezium state which can be passed onto Debezium engine to make it
304307
* process binlogs from a specific file and position and skip snapshot phase
305308
*/
306-
private JsonNode constructBinlogOffset(final JdbcDatabase database, final String dbName) {
307-
return format(getStateAttributesFromDB(database), dbName, Instant.now());
309+
private JsonNode constructBinlogOffset(final JdbcDatabase database, final String debeziumName, final String topicPrefixName) {
310+
return format(getStateAttributesFromDB(database), debeziumName, topicPrefixName, Instant.now());
308311
}
309312

310313
@VisibleForTesting
311-
public JsonNode format(final MysqlDebeziumStateAttributes attributes, final String dbName, final Instant time) {
312-
final String key = "[\"" + dbName + "\",{\"server\":\"" + dbName + "\"}]";
314+
public JsonNode format(final MysqlDebeziumStateAttributes attributes, final String debeziumName, final String topicPrefixName, final Instant time) {
315+
final String key = "[\"" + debeziumName + "\",{\"server\":\"" + topicPrefixName + "\"}]";
313316
final String gtidSet = attributes.gtidSet().isPresent() ? ",\"gtids\":\"" + attributes.gtidSet().get() + "\"" : "";
314317
final String value =
315318
"{\"transaction_id\":null,\"ts_sec\":" + time.getEpochSecond() + ",\"file\":\"" + attributes.binlogFilename() + "\",\"pos\":"

airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java

+36-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,12 @@ protected void purgeAllBinaryLogs() {
100100

101101
@Override
102102
protected String createSchemaSqlFmt() {
103-
return "CREATE DATABASE IF NOT EXISTS %s;";
103+
return "CREATE DATABASE IF NOT EXISTS `%s`;";
104+
}
105+
106+
@Override
107+
protected String createTableSqlFmt() {
108+
return "CREATE TABLE `%s`.`%s`(%s);";
104109
}
105110

106111
@Override
@@ -176,6 +181,36 @@ protected void addCdcDefaultCursorField(final AirbyteStream stream) {
176181
}
177182
}
178183

184+
@Override
185+
protected void writeRecords(
186+
final JsonNode recordJson,
187+
final String dbName,
188+
final String streamName,
189+
final String idCol,
190+
final String makeIdCol,
191+
final String modelCol) {
192+
testdb.with("INSERT INTO `%s` .`%s` (%s, %s, %s) VALUES (%s, %s, '%s');", dbName, streamName,
193+
idCol, makeIdCol, modelCol,
194+
recordJson.get(idCol).asInt(), recordJson.get(makeIdCol).asInt(),
195+
recordJson.get(modelCol).asText());
196+
}
197+
198+
@Override
199+
protected void deleteMessageOnIdCol(final String streamName, final String idCol, final int idValue) {
200+
testdb.with("DELETE FROM `%s`.`%s` WHERE %s = %s", modelsSchema(), streamName, idCol, idValue);
201+
}
202+
203+
@Override
204+
protected void deleteCommand(final String streamName) {
205+
testdb.with("DELETE FROM `%s`.`%s`", modelsSchema(), streamName);
206+
}
207+
208+
@Override
209+
protected void updateCommand(final String streamName, final String modelCol, final String modelVal, final String idCol, final int idValue) {
210+
testdb.with("UPDATE `%s`.`%s` SET %s = '%s' WHERE %s = %s", modelsSchema(), streamName,
211+
modelCol, modelVal, COL_ID, 11);
212+
}
213+
179214
@Test
180215
protected void syncWithReplicationClientPrivilegeRevokedFailsCheck() throws Exception {
181216
testdb.with("REVOKE REPLICATION CLIENT ON *.* FROM %s@'%%';", testdb.getUserName());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.source.mysql;
6+
7+
import io.airbyte.integrations.source.mysql.MySQLTestDatabase.BaseImage;
8+
import io.airbyte.integrations.source.mysql.MySQLTestDatabase.ContainerModifier;
9+
10+
public class CdcMysqlSourceWithSpecialDbNameTest extends CdcMysqlSourceTest {
11+
12+
public static final String INVALID_DB_NAME = "invalid@name";
13+
14+
@Override
15+
protected MySQLTestDatabase createTestDatabase() {
16+
return MySQLTestDatabase.inWithDbName(BaseImage.MYSQL_8, INVALID_DB_NAME, ContainerModifier.INVALID_TIMEZONE_CEST, ContainerModifier.CUSTOM_NAME)
17+
.withCdcPermissions();
18+
}
19+
20+
}

airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MysqlDebeziumStateUtilTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public void debeziumInitialStateConstructTest() throws SQLException {
9292
public void formatTestWithGtid() {
9393
final MySqlDebeziumStateUtil mySqlDebeziumStateUtil = new MySqlDebeziumStateUtil();
9494
final JsonNode debeziumState = mySqlDebeziumStateUtil.format(new MysqlDebeziumStateAttributes("binlog.000002", 633,
95-
Optional.of("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5")), "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
95+
Optional.of("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5")), "db_fgnfxvllud", "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
9696
final Map<String, String> stateAsMap = Jsons.object(debeziumState, Map.class);
9797
Assertions.assertEquals(1, stateAsMap.size());
9898
Assertions.assertTrue(stateAsMap.containsKey("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]"));
@@ -113,15 +113,15 @@ public void formatTestWithGtid() {
113113
debeziumState, config);
114114
Assertions.assertTrue(parsedOffset.isPresent());
115115
final JsonNode stateGeneratedUsingParsedOffset =
116-
mySqlDebeziumStateUtil.format(parsedOffset.get(), "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
116+
mySqlDebeziumStateUtil.format(parsedOffset.get(), "db_fgnfxvllud", "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
117117
Assertions.assertEquals(debeziumState, stateGeneratedUsingParsedOffset);
118118
}
119119

120120
@Test
121121
public void formatTestWithoutGtid() {
122122
final MySqlDebeziumStateUtil mySqlDebeziumStateUtil = new MySqlDebeziumStateUtil();
123123
final JsonNode debeziumState = mySqlDebeziumStateUtil.format(new MysqlDebeziumStateAttributes("binlog.000002", 633,
124-
Optional.empty()), "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
124+
Optional.empty()), "db_fgnfxvllud", "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
125125
final Map<String, String> stateAsMap = Jsons.object(debeziumState, Map.class);
126126
Assertions.assertEquals(1, stateAsMap.size());
127127
Assertions.assertTrue(stateAsMap.containsKey("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]"));
@@ -141,7 +141,7 @@ public void formatTestWithoutGtid() {
141141
debeziumState, config);
142142
Assertions.assertTrue(parsedOffset.isPresent());
143143
final JsonNode stateGeneratedUsingParsedOffset =
144-
mySqlDebeziumStateUtil.format(parsedOffset.get(), "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
144+
mySqlDebeziumStateUtil.format(parsedOffset.get(), "db_fgnfxvllud", "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
145145
Assertions.assertEquals(debeziumState, stateGeneratedUsingParsedOffset);
146146
}
147147

airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLContainerFactory.java

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public void withMoscowTimezone(MySQLContainer<?> container) {
3535
container.withEnv("TZ", "Europe/Moscow");
3636
}
3737

38+
public void withCustomName(MySQLContainer<?> container) {} // do nothing
39+
3840
public void withRootAndServerCertificates(MySQLContainer<?> container) {
3941
execInContainer(container,
4042
"sed -i '31 a ssl' /etc/my.cnf",

airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java

+34-3
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ public enum ContainerModifier {
3737
ROOT_AND_SERVER_CERTIFICATES("withRootAndServerCertificates"),
3838
CLIENT_CERTITICATE("withClientCertificate"),
3939
NETWORK("withNetwork"),
40-
;
40+
41+
CUSTOM_NAME("withCustomName");
4142

4243
public final String methodName;
4344

@@ -53,6 +54,15 @@ static public MySQLTestDatabase in(BaseImage baseImage, ContainerModifier... met
5354
return new MySQLTestDatabase(container).initialized();
5455
}
5556

57+
static public MySQLTestDatabase inWithDbName(BaseImage baseImage, String dbName, ContainerModifier... methods) {
58+
String[] methodNames = Stream.of(methods).map(im -> im.methodName).toList().toArray(new String[0]);
59+
final var container = new MySQLContainerFactory().shared(baseImage.reference, methodNames);
60+
MySQLTestDatabase db = new MySQLTestDatabase(container);
61+
db.setDatabaseName(dbName);
62+
db.initialized();
63+
return db;
64+
}
65+
5666
public MySQLTestDatabase(MySQLContainer<?> container) {
5767
super(container);
5868
}
@@ -70,6 +80,26 @@ public MySQLTestDatabase withoutStrictMode() {
7080
}
7181

7282
static private final int MAX_CONNECTIONS = 1000;
83+
private String databaseName = "";
84+
85+
@Override
86+
public String getDatabaseName() {
87+
if (databaseName.isBlank()) {
88+
return super.getDatabaseName();
89+
} else {
90+
return databaseName;
91+
}
92+
}
93+
94+
@Override
95+
public void close() {
96+
super.close();
97+
databaseName = "";
98+
}
99+
100+
public void setDatabaseName(final String databaseName) {
101+
this.databaseName = databaseName;
102+
}
73103

74104
@Override
75105
protected Stream<Stream<String>> inContainerBootstrapCmd() {
@@ -80,18 +110,19 @@ protected Stream<Stream<String>> inContainerBootstrapCmd() {
80110
"sh", "-c", "ln -s -f /var/lib/mysql/mysql.sock /var/run/mysqld/mysqld.sock"),
81111
mysqlCmd(Stream.of(
82112
String.format("SET GLOBAL max_connections=%d", MAX_CONNECTIONS),
83-
String.format("CREATE DATABASE %s", getDatabaseName()),
113+
String.format("CREATE DATABASE \\`%s\\`", getDatabaseName()),
84114
String.format("CREATE USER '%s' IDENTIFIED BY '%s'", getUserName(), getPassword()),
85115
// Grant privileges also to the container's user, which is not root.
86116
String.format("GRANT ALL PRIVILEGES ON *.* TO '%s', '%s' WITH GRANT OPTION", getUserName(),
87117
getContainer().getUsername()))));
118+
88119
}
89120

90121
@Override
91122
protected Stream<String> inContainerUndoBootstrapCmd() {
92123
return mysqlCmd(Stream.of(
93124
String.format("DROP USER '%s'", getUserName()),
94-
String.format("DROP DATABASE %s", getDatabaseName())));
125+
String.format("DROP DATABASE \\`%s\\`", getDatabaseName())));
95126
}
96127

97128
@Override

0 commit comments

Comments
 (0)