Skip to content

Commit b230543

Browse files
authored
MySQL CDC sync fails because starting binlog position not found in DB #6425 (#9514)
* Check binlog position on mysql server before run sync job, add error description into log * fix MySqlStrictEncryptSourceAcceptanceTest * fix formatting * fix review comments * added java docs and fixed few minor comments * fix formatting * update versions * update source_specs.yaml
1 parent e3e05d7 commit b230543

File tree

10 files changed

+174
-34
lines changed

10 files changed

+174
-34
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
33
"name": "MySQL",
44
"dockerRepository": "airbyte/source-mysql",
5-
"dockerImageTag": "0.5.1",
5+
"dockerImageTag": "0.5.2",
66
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql",
77
"icon": "mysql.svg"
88
}

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@
474474
- name: MySQL
475475
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
476476
dockerRepository: airbyte/source-mysql
477-
dockerImageTag: 0.5.1
477+
dockerImageTag: 0.5.2
478478
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
479479
icon: mysql.svg
480480
sourceType: database

airbyte-config/init/src/main/resources/seed/source_specs.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4963,7 +4963,7 @@
49634963
supportsNormalization: false
49644964
supportsDBT: false
49654965
supported_destination_sync_modes: []
4966-
- dockerImage: "airbyte/source-mysql:0.5.1"
4966+
- dockerImage: "airbyte/source-mysql:0.5.2"
49674967
spec:
49684968
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
49694969
connectionSpecification:

airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceAcceptanceTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ protected static List<AirbyteRecordMessage> filterRecords(final Collection<Airby
286286
.collect(Collectors.toList());
287287
}
288288

289-
private ConfiguredAirbyteCatalog withSourceDefinedCursors(final ConfiguredAirbyteCatalog catalog) {
289+
protected ConfiguredAirbyteCatalog withSourceDefinedCursors(final ConfiguredAirbyteCatalog catalog) {
290290
final ConfiguredAirbyteCatalog clone = Jsons.clone(catalog);
291291
for (final ConfiguredAirbyteStream configuredStream : clone.getStreams()) {
292292
if (configuredStream.getSyncMode() == INCREMENTAL

airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-mysql-strict-encrypt
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.1.7
19+
LABEL io.airbyte.version=0.1.8
2020
LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt

airbyte-integrations/connectors/source-mysql/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-mysql
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.5.1
19+
LABEL io.airbyte.version=0.5.2
2020
LABEL io.airbyte.name=airbyte/source-mysql

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java

+8-24
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
88
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
9+
import static io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper.checkBinlog;
910
import static java.util.stream.Collectors.toList;
1011

1112
import com.fasterxml.jackson.databind.JsonNode;
@@ -21,8 +22,10 @@
2122
import io.airbyte.integrations.base.ssh.SshWrappedSource;
2223
import io.airbyte.integrations.debezium.AirbyteDebeziumHandler;
2324
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
25+
import io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper;
2426
import io.airbyte.integrations.source.relationaldb.StateManager;
2527
import io.airbyte.integrations.source.relationaldb.TableInfo;
28+
import io.airbyte.integrations.source.relationaldb.models.CdcState;
2629
import io.airbyte.protocol.models.AirbyteCatalog;
2730
import io.airbyte.protocol.models.AirbyteMessage;
2831
import io.airbyte.protocol.models.AirbyteStream;
@@ -97,9 +100,7 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
97100
public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final JsonNode config) throws Exception {
98101
final List<CheckedConsumer<JdbcDatabase, Exception>> checkOperations = new ArrayList<>(super.getCheckOperations(config));
99102
if (isCdc(config)) {
100-
checkOperations.addAll(List.of(getCheckOperation("log_bin", "ON"),
101-
getCheckOperation("binlog_format", "ROW"),
102-
getCheckOperation("binlog_row_image", "FULL")));
103+
checkOperations.addAll(CdcConfigurationHelper.getCheckOperations());
103104
}
104105
return checkOperations;
105106
}
@@ -180,8 +181,10 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
180181
new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), MySqlCdcProperties.getDebeziumProperties(),
181182
catalog, true);
182183

183-
return handler.getIncrementalIterators(new MySqlCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()),
184-
new MySqlCdcStateHandler(stateManager), new MySqlCdcConnectorMetadataInjector(), emittedAt);
184+
Optional<CdcState> cdcState = Optional.ofNullable(stateManager.getCdcStateManager().getCdcState());
185+
MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState.orElse(null));
186+
cdcState.ifPresent(cdc -> checkBinlog(cdc.getState(), database));
187+
return handler.getIncrementalIterators(fetcher, new MySqlCdcStateHandler(stateManager), new MySqlCdcConnectorMetadataInjector(), emittedAt);
185188
} else {
186189
LOGGER.info("using CDC: {}", false);
187190
return super.getIncrementalIterators(database, catalog, tableNameToTable, stateManager,
@@ -210,23 +213,4 @@ public enum ReplicationMethod {
210213
CDC
211214
}
212215

213-
private CheckedConsumer<JdbcDatabase, Exception> getCheckOperation(String name, String value) {
214-
return database -> {
215-
final List<String> result = database.resultSetQuery(connection -> {
216-
final String sql = String.format("show variables where Variable_name = '%s'", name);
217-
218-
return connection.createStatement().executeQuery(sql);
219-
}, resultSet -> resultSet.getString("Value")).collect(toList());
220-
221-
if (result.size() != 1) {
222-
throw new RuntimeException(String.format("Could not query the variable %s", name));
223-
}
224-
225-
final String resultValue = result.get(0);
226-
if (!resultValue.equalsIgnoreCase(value)) {
227-
throw new RuntimeException(String.format("The variable %s should be set to %s, but it is : %s", name, value, resultValue));
228-
}
229-
};
230-
}
231-
232216
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.source.mysql.helpers;
6+
7+
import static java.util.stream.Collectors.toList;
8+
9+
import com.fasterxml.jackson.databind.JsonNode;
10+
import io.airbyte.commons.functional.CheckedConsumer;
11+
import io.airbyte.commons.json.Jsons;
12+
import io.airbyte.db.jdbc.JdbcDatabase;
13+
import java.sql.SQLException;
14+
import java.util.Iterator;
15+
import java.util.List;
16+
import java.util.Map;
17+
import java.util.Optional;
18+
import java.util.stream.Collectors;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
/**
23+
* Helper class for MySqlSource used to check cdc configuration in case of:
24+
* <p>
25+
* 1. adding new source and checking operations #getCheckOperations method.
26+
* </p>
27+
* <p>
28+
* 2. checking whether binlog required from saved cdc offset is available on mysql server
29+
* #checkBinlog method
30+
* </p>
31+
*/
32+
public class CdcConfigurationHelper {
33+
34+
private static final Logger LOGGER = LoggerFactory.getLogger(CdcConfigurationHelper.class);
35+
private static final String CDC_OFFSET = "mysql_cdc_offset";
36+
private static final String LOG_BIN = "log_bin";
37+
private static final String BINLOG_FORMAT = "binlog_format";
38+
private static final String BINLOG_ROW_IMAGE = "binlog_row_image";
39+
40+
/**
41+
* Method will check whether required binlog is available on mysql server
42+
*
43+
* @param offset - saved cdc offset with required binlog file
44+
* @param database - database
45+
*/
46+
public static void checkBinlog(JsonNode offset, JdbcDatabase database) {
47+
Optional<String> binlogOptional = getBinlog(offset);
48+
binlogOptional.ifPresent(binlog -> {
49+
if (isBinlogAvailable(binlog, database)) {
50+
LOGGER.info("""
51+
Binlog %s is available""".formatted(binlog));
52+
} else {
53+
String error =
54+
"""
55+
Binlog %s is not available. This is a critical error, it means that requested binlog is not present on mysql server. To fix data synchronization you need to reset your data. Please check binlog retention policy configurations."""
56+
.formatted(binlog);
57+
LOGGER.error(error);
58+
throw new RuntimeException("""
59+
Binlog %s is not available.""".formatted(binlog));
60+
}
61+
});
62+
}
63+
64+
/**
65+
* Method will get required configurations for cdc sync
66+
*
67+
* @return list of List<CheckedConsumer<JdbcDatabase, Exception>>
68+
*/
69+
public static List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations() {
70+
return List.of(getCheckOperation(LOG_BIN, "ON"),
71+
getCheckOperation(BINLOG_FORMAT, "ROW"),
72+
getCheckOperation(BINLOG_ROW_IMAGE, "FULL"));
73+
74+
}
75+
76+
private static boolean isBinlogAvailable(String binlog, JdbcDatabase database) {
77+
try {
78+
List<String> binlogs = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW BINARY LOGS"),
79+
resultSet -> resultSet.getString("Log_name")).collect(Collectors.toList());
80+
81+
return !binlog.isEmpty() && binlogs.stream().anyMatch(e -> e.equals(binlog));
82+
} catch (SQLException e) {
83+
LOGGER.error("Can not get binlog list. Error: ", e);
84+
throw new RuntimeException(e);
85+
}
86+
}
87+
88+
private static Optional<String> getBinlog(JsonNode offset) {
89+
JsonNode node = offset.get(CDC_OFFSET);
90+
Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
91+
while (fields.hasNext()) {
92+
Map.Entry<String, JsonNode> jsonField = fields.next();
93+
return Optional.ofNullable(Jsons.deserialize(jsonField.getValue().asText()).path("file").asText());
94+
}
95+
return Optional.empty();
96+
}
97+
98+
private static CheckedConsumer<JdbcDatabase, Exception> getCheckOperation(String name, String value) {
99+
return database -> {
100+
final List<String> result = database.resultSetQuery(connection -> {
101+
final String sql = """
102+
show variables where Variable_name = '%s'""".formatted(name);
103+
104+
return connection.createStatement().executeQuery(sql);
105+
}, resultSet -> resultSet.getString("Value")).collect(toList());
106+
107+
if (result.size() != 1) {
108+
throw new RuntimeException("""
109+
Could not query the variable %s""".formatted(name));
110+
}
111+
112+
final String resultValue = result.get(0);
113+
if (!resultValue.equalsIgnoreCase(value)) {
114+
throw new RuntimeException("""
115+
The variable %s should be set to %s, but it is : %s""".formatted(name, value, resultValue));
116+
}
117+
};
118+
}
119+
120+
}

airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceAcceptanceTest.java

+39-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
package io.airbyte.integrations.source.mysql;
66

7+
import static io.airbyte.protocol.models.SyncMode.INCREMENTAL;
8+
import static org.junit.jupiter.api.Assertions.assertFalse;
9+
import static org.junit.jupiter.api.Assertions.assertThrows;
10+
711
import com.fasterxml.jackson.databind.JsonNode;
812
import com.google.common.collect.ImmutableMap;
913
import com.google.common.collect.Lists;
@@ -14,6 +18,9 @@
1418
import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod;
1519
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
1620
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
21+
import io.airbyte.protocol.models.AirbyteMessage;
22+
import io.airbyte.protocol.models.AirbyteRecordMessage;
23+
import io.airbyte.protocol.models.AirbyteStateMessage;
1724
import io.airbyte.protocol.models.CatalogHelpers;
1825
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
1926
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
@@ -23,7 +30,9 @@
2330
import io.airbyte.protocol.models.JsonSchemaPrimitive;
2431
import io.airbyte.protocol.models.SyncMode;
2532
import java.util.List;
33+
import java.util.stream.Collectors;
2634
import org.jooq.SQLDialect;
35+
import org.junit.jupiter.api.Test;
2736
import org.testcontainers.containers.MySQLContainer;
2837

2938
public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest {
@@ -52,7 +61,7 @@ protected JsonNode getConfig() {
5261
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
5362
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
5463
new ConfiguredAirbyteStream()
55-
.withSyncMode(SyncMode.INCREMENTAL)
64+
.withSyncMode(INCREMENTAL)
5665
.withDestinationSyncMode(DestinationSyncMode.APPEND)
5766
.withStream(CatalogHelpers.createAirbyteStream(
5867
String.format("%s", STREAM_NAME),
@@ -62,9 +71,9 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
6271
.withSourceDefinedCursor(true)
6372
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
6473
.withSupportedSyncModes(
65-
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
74+
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))),
6675
new ConfiguredAirbyteStream()
67-
.withSyncMode(SyncMode.INCREMENTAL)
76+
.withSyncMode(INCREMENTAL)
6877
.withDestinationSyncMode(DestinationSyncMode.APPEND)
6978
.withStream(CatalogHelpers.createAirbyteStream(
7079
String.format("%s", STREAM_NAME2),
@@ -74,7 +83,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
7483
.withSourceDefinedCursor(true)
7584
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
7685
.withSupportedSyncModes(
77-
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
86+
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL)))));
7887
}
7988

8089
@Override
@@ -143,4 +152,30 @@ protected void tearDown(final TestDestinationEnv testEnv) {
143152
container.close();
144153
}
145154

155+
@Test
156+
public void testIncrementalSyncFailedIfBinlogIsDeleted() throws Exception {
157+
final ConfiguredAirbyteCatalog configuredCatalog = withSourceDefinedCursors(getConfiguredCatalog());
158+
// only sync incremental streams
159+
configuredCatalog.setStreams(
160+
configuredCatalog.getStreams().stream().filter(s -> s.getSyncMode() == INCREMENTAL).collect(Collectors.toList()));
161+
162+
final List<AirbyteMessage> airbyteMessages = runRead(configuredCatalog, getState());
163+
final List<AirbyteRecordMessage> recordMessages = filterRecords(airbyteMessages);
164+
final List<AirbyteStateMessage> stateMessages = airbyteMessages
165+
.stream()
166+
.filter(m -> m.getType() == AirbyteMessage.Type.STATE)
167+
.map(AirbyteMessage::getState)
168+
.collect(Collectors.toList());
169+
assertFalse(recordMessages.isEmpty(), "Expected the first incremental sync to produce records");
170+
assertFalse(stateMessages.isEmpty(), "Expected incremental sync to produce STATE messages");
171+
172+
// when we run incremental sync again there should be no new records. Run a sync with the latest
173+
// state message and assert no records were emitted.
174+
final JsonNode latestState = stateMessages.get(stateMessages.size() - 1).getData();
175+
// RESET MASTER removes all binary log files that are listed in the index file,
176+
// leaving only a single, empty binary log file with a numeric suffix of .000001
177+
executeQuery("RESET MASTER;");
178+
assertThrows(Exception.class, () -> filterRecords(runRead(configuredCatalog, latestState)));
179+
}
180+
146181
}

docs/integrations/sources/mysql.md

+1
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ If you do not see a type in this list, assume that it is coerced into a string.
182182

183183
| Version | Date | Pull Request | Subject |
184184
|:--------| :--- | :--- | :--- |
185+
| 0.5.2 | 2021-12-14 | [6425](https://github.com/airbytehq/airbyte/issues/6425) | MySQL CDC sync fails because starting binlog position not found in DB |
185186
| 0.5.1 | 2021-12-13 | [8582](https://github.com/airbytehq/airbyte/pull/8582) | Update connector fields title/description |
186187
| 0.5.0 | 2021-12-11 | [7970](https://github.com/airbytehq/airbyte/pull/7970) | Support all MySQL types |
187188
| 0.4.13 | 2021-12-03 | [8335](https://github.com/airbytehq/airbyte/pull/8335) | Source-MySql: do not check cdc required param binlog_row_image for standard replication |

0 commit comments

Comments
 (0)