Skip to content

[Source-MySQL]CDC: Enable frequent state emission during incremental runs #29308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ default boolean isHeartbeatSupported() {
*
* @param offset DB CDC offset
* @param event Event from the CDC load
* @return Returns `true` when the record is behind the offset. Otherwise, it returns `false`
* @return Returns `true` when the event is ahead of the offset. Otherwise, it returns `false`
*/
default boolean isRecordBehindOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
default boolean isEventAheadOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : update the comment on the interface method

return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ protected AirbyteMessage computeNext() {
if (checkpointOffsetToSend.size() == 1
&& changeEventIterator.hasNext()
&& !event.isSnapshotEvent()
&& targetPosition.isRecordBehindOffset(checkpointOffsetToSend, event)) {
&& targetPosition.isEventAheadOffset(checkpointOffsetToSend, event)) {
sendCheckpointMessage = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.debezium.internals.mysql;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.debezium.CdcTargetPosition;
import io.airbyte.integrations.debezium.internals.ChangeEventWithMetadata;
Expand Down Expand Up @@ -102,6 +104,43 @@ public boolean isHeartbeatSupported() {
return true;
}

@Override
public boolean isEventAheadOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
if (offset.size() != 1) {
return false;
}

final String eventFileName = event.eventValueAsJson().get("source").get("file").asText();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case node would not exist?
need to check if there?

final long eventPosition = event.eventValueAsJson().get("source").get("pos").asLong();

final JsonNode offsetJson = Jsons.deserialize((String) offset.values().toArray()[0]);

final String offsetFileName = offsetJson.get("file").asText();
final long offsetPosition = offsetJson.get("pos").asLong();
if (eventFileName.compareTo(offsetFileName) != 0) {
return eventFileName.compareTo(offsetFileName) > 0;
}

return eventPosition > offsetPosition;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Judging by the code here we're evaluating whether the offset associated with the event is past the offset (which is inline with the logic of sending state messages). Do you think it makes sense to rename this interface method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! Even I agree with Akash, lets change the method name to isEventAheadOffset

}

@Override
public boolean isSameOffset(final Map<String, String> offsetA, final Map<String, String> offsetB) {
if ((offsetA == null || offsetA.size() != 1) || (offsetB == null || offsetB.size() != 1)) {
return false;
}

final JsonNode offsetJsonA = Jsons.deserialize((String) offsetA.values().toArray()[0]);
final String offsetAFileName = offsetJsonA.get("file").asText();
final long offsetAPosition = offsetJsonA.get("pos").asLong();

final JsonNode offsetJsonB = Jsons.deserialize((String) offsetB.values().toArray()[0]);
final String offsetBFileName = offsetJsonB.get("file").asText();
final long offsetBPosition = offsetJsonB.get("pos").asLong();

return offsetAFileName.equals(offsetBFileName) && offsetAPosition == offsetBPosition;
}

@Override
public MySqlCdcPosition extractPositionFromHeartbeatOffset(final Map<String, ?> sourceOffset) {
return new MySqlCdcPosition(sourceOffset.get("file").toString(), (Long) sourceOffset.get("pos"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public Long extractPositionFromHeartbeatOffset(final Map<String, ?> sourceOffset
}

@Override
public boolean isRecordBehindOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
public boolean isEventAheadOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
if (offset.size() != 1) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ ENV APPLICATION source-mysql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=3.0.0
LABEL io.airbyte.version=3.0.1

LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.0.0
dockerImageTag: 3.0.1
dockerRepository: airbyte/source-mysql-strict-encrypt
githubIssueLabel: source-mysql
icon: mysql.svg
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=3.0.0
LABEL io.airbyte.version=3.0.1

LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.0.0
dockerImageTag: 3.0.1
dockerRepository: airbyte/source-mysql
githubIssueLabel: source-mysql
icon: mysql.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public MySqlCdcStateHandler(final StateManager stateManager) {
this.stateManager = stateManager;
}

@Override
public boolean isCdcCheckpointEnabled() {
return true;
}

@Override
public AirbyteMessage saveState(final Map<String, String> offset, final String dbHistory) {
final Map<String, Object> state = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,47 @@ protected void syncShouldHandlePurgedLogsGracefully() throws Exception {
"Expected 46 records to be replicated in the second sync.");
}

/**
* This test verifies that multiple states are sent during the CDC process based on number of records.
* We can ensure that more than one `STATE` type of message is sent, but we are not able to assert
* the exact number of messages sent as depends on Debezium.
*
* @throws Exception Exception happening in the test.
*/
@Test
protected void verifyCheckpointStatesByRecords() throws Exception {
// We require a huge amount of records, otherwise Debezium will notify directly the last offset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC there is a config option to control the number of state messages? so that it can be set to something smaller : example :

Can we use the same config to override the default state emission frequency for cdc incremental runs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you set that in the config, it will be respected :

config.get("sync_checkpoint_seconds") != null ? Duration.ofSeconds(config.get("sync_checkpoint_seconds").asLong())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default for the tests we are already setting the sync_checkpoint_records to 1
.put("sync_checkpoint_records", 1) (here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already overriding it to 1 like Subodh said.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I guess my question is why do we need a large number of records to create to test this? Is this because the offset (bin log file/pos) doesn't advance unless we are inserting such a large amount of records?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good question. I don't think we need that many, but 20k records isn't a lot of records anyway. We can reduce this if we need to. @subodh1810 thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need so many records because the offset file might not always be available to read (being edited by Debezium in parallel)

final int recordsToCreate = 20000;

final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = getSource()
.read(getConfig(), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> dataFromFirstBatch = AutoCloseableIterators
.toListAndClose(firstBatchIterator);
final List<AirbyteStateMessage> stateMessages = extractStateMessages(dataFromFirstBatch);

// As first `read` operation is from snapshot, it would generate only one state message at the end
// of the process.
assertExpectedStateMessages(stateMessages);

for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 200 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
"F-" + recordsCreated));
writeModelRecord(record);
}

final JsonNode stateAfterFirstSync = Jsons.jsonNode(Collections.singletonList(stateMessages.get(stateMessages.size() - 1)));
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = getSource()
.read(getConfig(), CONFIGURED_CATALOG, stateAfterFirstSync);
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
.toListAndClose(secondBatchIterator);
assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size());
final List<AirbyteStateMessage> stateMessagesCDC = extractStateMessages(dataFromSecondBatch);
assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state.");
assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states.");
}

protected void assertStateForSyncShouldHandlePurgedLogsGracefully(final List<AirbyteStateMessage> stateMessages, final int syncNumber) {
assertExpectedStateMessages(stateMessages);
}
Expand Down
2 changes: 1 addition & 1 deletion docs/integrations/sources/mysql-migrations.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Source Mysql Migration Guide
# MySQL Migration Guide

## Upgrading to 3.0.0
CDC syncs now has default cursor field called `_ab_cdc_cursor`. You will need to force normalization to rebuild your destination tables by manually dropping the SCD tables, refreshing the connection schema (skipping the reset), and running a sync. Alternatively, you can just run a reset.
5 changes: 3 additions & 2 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,9 @@ WHERE actor_definition_id ='435bb9a5-7887-4809-aa58-28c27df0d7ad' AND (configura
## Changelog

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------- |
| 3.0.0 | 2023-08-08 | [28756](https://github.com/airbytehq/airbyte/pull/28756) | Set a default cursor for Cdc mode |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.0.1 | 2023-08-21 | [29308](https://github.com/airbytehq/airbyte/pull/29308) | CDC: Enable frequent state emissions during incremental runs |
| 3.0.0 | 2023-08-08 | [28756](https://github.com/airbytehq/airbyte/pull/28756) | CDC: Set a default cursor |
| 2.1.2 | 2023-08-08 | [29220](https://github.com/airbytehq/airbyte/pull/29220) | Add indicator that CDC is the recommended update method |
| 2.1.1 | 2023-07-31 | [28882](https://github.com/airbytehq/airbyte/pull/28882) | Improve replication method labels and descriptions |
| 2.1.0 | 2023-06-26 | [27737](https://github.com/airbytehq/airbyte/pull/27737) | License Update: Elv2 |
Expand Down