Skip to content

Commit ec6da09

Browse files
author
nguyenaiden
committed
Enabled logging for MySQL CDC Incremental sync
1 parent 6d49df7 commit ec6da09

File tree

4 files changed

+93
-1
lines changed

4 files changed

+93
-1
lines changed

airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mysql/MySqlCdcTargetPosition.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
package io.airbyte.integrations.debezium.internals.mysql;
66

7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import io.airbyte.commons.json.Jsons;
79
import io.airbyte.db.jdbc.JdbcDatabase;
810
import io.airbyte.integrations.debezium.CdcTargetPosition;
911
import io.airbyte.integrations.debezium.internals.ChangeEventWithMetadata;
@@ -102,6 +104,50 @@ public boolean isHeartbeatSupported() {
102104
return true;
103105
}
104106

107+
@Override
108+
public boolean isRecordBehindOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
109+
if (offset.size() != 1) {
110+
return false;
111+
}
112+
113+
final String eventFileName = event.eventValueAsJson().get("source").get("file").asText();
114+
final long eventPosition = event.eventValueAsJson().get("source").get("pos").asLong();
115+
116+
final JsonNode offsetJson = Jsons.deserialize((String) offset.values().toArray()[0]);
117+
118+
final String offsetFileName = offsetJson.get("file").asText();
119+
final long offsetPosition = offsetJson.get("pos").asLong();
120+
if (eventFileName.compareTo(offsetFileName) != 0) {
121+
return eventFileName.compareTo(offsetFileName) > 0;
122+
}
123+
124+
return eventPosition > offsetPosition;
125+
}
126+
127+
@Override
128+
public boolean isSameOffset(final Map<String, String> offsetA, final Map<String, String> offsetB) {
129+
if (offsetA == null || offsetA.size() != 1) {
130+
return false;
131+
}
132+
if (offsetB == null || offsetB.size() != 1) {
133+
return false;
134+
}
135+
136+
final JsonNode offsetJsonA = Jsons.deserialize((String) offsetA.values().toArray()[0]);
137+
final String offsetAFileName = offsetJsonA.get("file").asText();
138+
final long offsetAPosition = offsetJsonA.get("pos").asLong();
139+
140+
final JsonNode offsetJsonB = Jsons.deserialize((String) offsetB.values().toArray()[0]);
141+
final String offsetBFileName = offsetJsonB.get("file").asText();
142+
final long offsetBPosition = offsetJsonB.get("pos").asLong();
143+
144+
if (offsetAFileName.compareTo(offsetBFileName) != 0) {
145+
return false;
146+
}
147+
148+
return offsetAPosition == offsetBPosition;
149+
}
150+
105151
@Override
106152
public MySqlCdcPosition extractPositionFromHeartbeatOffset(final Map<String, ?> sourceOffset) {
107153
return new MySqlCdcPosition(sourceOffset.get("file").toString(), (Long) sourceOffset.get("pos"));

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcStateHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ public MySqlCdcStateHandler(final StateManager stateManager) {
3131
this.stateManager = stateManager;
3232
}
3333

34+
@Override
35+
public boolean isCdcCheckpointEnabled() {
36+
return true;
37+
}
38+
3439
@Override
3540
public AirbyteMessage saveState(final Map<String, String> offset, final String dbHistory) {
3641
final Map<String, Object> state = new HashMap<>();

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
public class MySqlSource extends AbstractJdbcSource<MysqlType> implements Source {
8787

8888
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSource.class);
89-
private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000;
89+
private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 1;
9090
public static final String NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY =
9191
"""
9292
SELECT (EXISTS (SELECT * from `%s`.`%s` where `%s` IS NULL LIMIT 1)) AS %s

airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,47 @@ protected void syncShouldHandlePurgedLogsGracefully() throws Exception {
295295
"Expected 46 records to be replicated in the second sync.");
296296
}
297297

298+
/**
299+
* This test verifies that multiple states are sent during the CDC process based on number of records.
300+
* We can ensure that more than one `STATE` type of message is sent, but we are not able to assert
301+
* the exact number of messages sent as depends on Debezium.
302+
*
303+
* @throws Exception Exception happening in the test.
304+
*/
305+
@Test
306+
protected void verifyCheckpointStatesByRecords() throws Exception {
307+
// We require a huge amount of records, otherwise Debezium will notify directly the last offset.
308+
final int recordsToCreate = 20000;
309+
310+
final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = getSource()
311+
.read(getConfig(), CONFIGURED_CATALOG, null);
312+
final List<AirbyteMessage> dataFromFirstBatch = AutoCloseableIterators
313+
.toListAndClose(firstBatchIterator);
314+
final List<AirbyteStateMessage> stateMessages = extractStateMessages(dataFromFirstBatch);
315+
316+
// As first `read` operation is from snapshot, it would generate only one state message at the end
317+
// of the process.
318+
assertExpectedStateMessages(stateMessages);
319+
320+
for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) {
321+
final JsonNode record =
322+
Jsons.jsonNode(ImmutableMap
323+
.of(COL_ID, 200 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
324+
"F-" + recordsCreated));
325+
writeModelRecord(record);
326+
}
327+
328+
final JsonNode stateAfterFirstSync = Jsons.jsonNode(Collections.singletonList(stateMessages.get(stateMessages.size() - 1)));
329+
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = getSource()
330+
.read(getConfig(), CONFIGURED_CATALOG, stateAfterFirstSync);
331+
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
332+
.toListAndClose(secondBatchIterator);
333+
assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size());
334+
final List<AirbyteStateMessage> stateMessagesCDC = extractStateMessages(dataFromSecondBatch);
335+
assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state.");
336+
assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states.");
337+
}
338+
298339
protected void assertStateForSyncShouldHandlePurgedLogsGracefully(final List<AirbyteStateMessage> stateMessages, final int syncNumber) {
299340
assertExpectedStateMessages(stateMessages);
300341
}

0 commit comments

Comments
 (0)