Skip to content

Commit 195bef3

Browse files
Duy Nguyenharrytou
Duy Nguyen
authored andcommitted
[Source-MySQL]CDC: Enable frequent state emission during incremental runs (airbytehq#29308)
1 parent d6f6557 commit 195bef3

File tree

12 files changed

+97
-11
lines changed

12 files changed

+97
-11
lines changed

airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/CdcTargetPosition.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ default boolean isHeartbeatSupported() {
5959
*
6060
* @param offset DB CDC offset
6161
* @param event Event from the CDC load
62-
* @return Returns `true` when the record is behind the offset. Otherwise, it returns `false`
62+
* @return Returns `true` when the event is ahead of the offset. Otherwise, it returns `false`
6363
*/
64-
default boolean isRecordBehindOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
64+
default boolean isEventAheadOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
6565
return false;
6666
}
6767

airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumStateDecoratingIterator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ protected AirbyteMessage computeNext() {
162162
if (checkpointOffsetToSend.size() == 1
163163
&& changeEventIterator.hasNext()
164164
&& !event.isSnapshotEvent()
165-
&& targetPosition.isRecordBehindOffset(checkpointOffsetToSend, event)) {
165+
&& targetPosition.isEventAheadOffset(checkpointOffsetToSend, event)) {
166166
sendCheckpointMessage = true;
167167
}
168168
}

airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/mysql/MySqlCdcTargetPosition.java

+39
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
package io.airbyte.integrations.debezium.internals.mysql;
66

7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import io.airbyte.commons.json.Jsons;
79
import io.airbyte.db.jdbc.JdbcDatabase;
810
import io.airbyte.integrations.debezium.CdcTargetPosition;
911
import io.airbyte.integrations.debezium.internals.ChangeEventWithMetadata;
@@ -102,6 +104,43 @@ public boolean isHeartbeatSupported() {
102104
return true;
103105
}
104106

107+
@Override
108+
public boolean isEventAheadOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
109+
if (offset.size() != 1) {
110+
return false;
111+
}
112+
113+
final String eventFileName = event.eventValueAsJson().get("source").get("file").asText();
114+
final long eventPosition = event.eventValueAsJson().get("source").get("pos").asLong();
115+
116+
final JsonNode offsetJson = Jsons.deserialize((String) offset.values().toArray()[0]);
117+
118+
final String offsetFileName = offsetJson.get("file").asText();
119+
final long offsetPosition = offsetJson.get("pos").asLong();
120+
if (eventFileName.compareTo(offsetFileName) != 0) {
121+
return eventFileName.compareTo(offsetFileName) > 0;
122+
}
123+
124+
return eventPosition > offsetPosition;
125+
}
126+
127+
@Override
128+
public boolean isSameOffset(final Map<String, String> offsetA, final Map<String, String> offsetB) {
129+
if ((offsetA == null || offsetA.size() != 1) || (offsetB == null || offsetB.size() != 1)) {
130+
return false;
131+
}
132+
133+
final JsonNode offsetJsonA = Jsons.deserialize((String) offsetA.values().toArray()[0]);
134+
final String offsetAFileName = offsetJsonA.get("file").asText();
135+
final long offsetAPosition = offsetJsonA.get("pos").asLong();
136+
137+
final JsonNode offsetJsonB = Jsons.deserialize((String) offsetB.values().toArray()[0]);
138+
final String offsetBFileName = offsetJsonB.get("file").asText();
139+
final long offsetBPosition = offsetJsonB.get("pos").asLong();
140+
141+
return offsetAFileName.equals(offsetBFileName) && offsetAPosition == offsetBPosition;
142+
}
143+
105144
@Override
106145
public MySqlCdcPosition extractPositionFromHeartbeatOffset(final Map<String, ?> sourceOffset) {
107146
return new MySqlCdcPosition(sourceOffset.get("file").toString(), (Long) sourceOffset.get("pos"));

airbyte-integrations/bases/debezium/src/main/java/io/airbyte/integrations/debezium/internals/postgres/PostgresCdcTargetPosition.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public Long extractPositionFromHeartbeatOffset(final Map<String, ?> sourceOffset
9494
}
9595

9696
@Override
97-
public boolean isRecordBehindOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
97+
public boolean isEventAheadOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
9898
if (offset.size() != 1) {
9999
return false;
100100
}

airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@ ENV APPLICATION source-mysql-strict-encrypt
2424

2525
COPY --from=build /airbyte /airbyte
2626

27-
LABEL io.airbyte.version=3.0.0
27+
LABEL io.airbyte.version=3.0.1
2828

2929
LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt

airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ data:
1111
connectorSubtype: database
1212
connectorType: source
1313
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
14-
dockerImageTag: 3.0.0
14+
dockerImageTag: 3.0.1
1515
dockerRepository: airbyte/source-mysql-strict-encrypt
1616
githubIssueLabel: source-mysql
1717
icon: mysql.svg

airbyte-integrations/connectors/source-mysql/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@ ENV APPLICATION source-mysql
2424

2525
COPY --from=build /airbyte /airbyte
2626

27-
LABEL io.airbyte.version=3.0.0
27+
LABEL io.airbyte.version=3.0.1
2828

2929
LABEL io.airbyte.name=airbyte/source-mysql

airbyte-integrations/connectors/source-mysql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ data:
66
connectorSubtype: database
77
connectorType: source
88
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
9-
dockerImageTag: 3.0.0
9+
dockerImageTag: 3.0.1
1010
dockerRepository: airbyte/source-mysql
1111
githubIssueLabel: source-mysql
1212
icon: mysql.svg

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcStateHandler.java

+5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ public MySqlCdcStateHandler(final StateManager stateManager) {
3131
this.stateManager = stateManager;
3232
}
3333

34+
@Override
35+
public boolean isCdcCheckpointEnabled() {
36+
return true;
37+
}
38+
3439
@Override
3540
public AirbyteMessage saveState(final Map<String, String> offset, final String dbHistory) {
3641
final Map<String, Object> state = new HashMap<>();

airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java

+41
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,47 @@ protected void syncShouldHandlePurgedLogsGracefully() throws Exception {
304304
"Expected 46 records to be replicated in the second sync.");
305305
}
306306

307+
/**
308+
* This test verifies that multiple states are sent during the CDC process based on number of records.
309+
* We can ensure that more than one `STATE` type of message is sent, but we are not able to assert
310+
* the exact number of messages sent as depends on Debezium.
311+
*
312+
* @throws Exception Exception happening in the test.
313+
*/
314+
@Test
315+
protected void verifyCheckpointStatesByRecords() throws Exception {
316+
// We require a huge amount of records, otherwise Debezium will notify directly the last offset.
317+
final int recordsToCreate = 20000;
318+
319+
final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = getSource()
320+
.read(getConfig(), CONFIGURED_CATALOG, null);
321+
final List<AirbyteMessage> dataFromFirstBatch = AutoCloseableIterators
322+
.toListAndClose(firstBatchIterator);
323+
final List<AirbyteStateMessage> stateMessages = extractStateMessages(dataFromFirstBatch);
324+
325+
// As first `read` operation is from snapshot, it would generate only one state message at the end
326+
// of the process.
327+
assertExpectedStateMessages(stateMessages);
328+
329+
for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) {
330+
final JsonNode record =
331+
Jsons.jsonNode(ImmutableMap
332+
.of(COL_ID, 200 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
333+
"F-" + recordsCreated));
334+
writeModelRecord(record);
335+
}
336+
337+
final JsonNode stateAfterFirstSync = Jsons.jsonNode(Collections.singletonList(stateMessages.get(stateMessages.size() - 1)));
338+
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = getSource()
339+
.read(getConfig(), CONFIGURED_CATALOG, stateAfterFirstSync);
340+
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
341+
.toListAndClose(secondBatchIterator);
342+
assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size());
343+
final List<AirbyteStateMessage> stateMessagesCDC = extractStateMessages(dataFromSecondBatch);
344+
assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state.");
345+
assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states.");
346+
}
347+
307348
protected void assertStateForSyncShouldHandlePurgedLogsGracefully(final List<AirbyteStateMessage> stateMessages, final int syncNumber) {
308349
assertExpectedStateMessages(stateMessages);
309350
}
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Source Mysql Migration Guide
1+
# MySQL Migration Guide
22

33
## Upgrading to 3.0.0
44
CDC syncs now has default cursor field called `_ab_cdc_cursor`. You will need to force normalization to rebuild your destination tables by manually dropping the SCD tables, refreshing the connection schema (skipping the reset), and running a sync. Alternatively, you can just run a reset.

docs/integrations/sources/mysql.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,9 @@ WHERE actor_definition_id ='435bb9a5-7887-4809-aa58-28c27df0d7ad' AND (configura
263263
## Changelog
264264

265265
| Version | Date | Pull Request | Subject |
266-
| :------ | :--------- | :--------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------- |
267-
| 3.0.0 | 2023-08-08 | [28756](https://github.com/airbytehq/airbyte/pull/28756) | Set a default cursor for Cdc mode |
266+
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
267+
| 3.0.1 | 2023-08-21 | [29308](https://github.com/airbytehq/airbyte/pull/29308) | CDC: Enable frequent state emissions during incremental runs |
268+
| 3.0.0 | 2023-08-08 | [28756](https://github.com/airbytehq/airbyte/pull/28756) | CDC: Set a default cursor |
268269
| 2.1.2 | 2023-08-08 | [29220](https://github.com/airbytehq/airbyte/pull/29220) | Add indicator that CDC is the recommended update method |
269270
| 2.1.1 | 2023-07-31 | [28882](https://github.com/airbytehq/airbyte/pull/28882) | Improve replication method labels and descriptions |
270271
| 2.1.0 | 2023-06-26 | [27737](https://github.com/airbytehq/airbyte/pull/27737) | License Update: Elv2 |

0 commit comments

Comments
 (0)