-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[Source-MySQL]CDC: Enable frequent state emission during incremental runs #29308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
82756c9
143a24a
a275611
57abadf
d4f68b0
54bba8b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,8 @@ | |
|
||
package io.airbyte.integrations.debezium.internals.mysql; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import io.airbyte.commons.json.Jsons; | ||
import io.airbyte.db.jdbc.JdbcDatabase; | ||
import io.airbyte.integrations.debezium.CdcTargetPosition; | ||
import io.airbyte.integrations.debezium.internals.ChangeEventWithMetadata; | ||
|
@@ -102,6 +104,43 @@ public boolean isHeartbeatSupported() { | |
return true; | ||
} | ||
|
||
@Override | ||
public boolean isEventAheadOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) { | ||
if (offset.size() != 1) { | ||
return false; | ||
} | ||
|
||
final String eventFileName = event.eventValueAsJson().get("source").get("file").asText(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a case node would not exist? |
||
final long eventPosition = event.eventValueAsJson().get("source").get("pos").asLong(); | ||
|
||
final JsonNode offsetJson = Jsons.deserialize((String) offset.values().toArray()[0]); | ||
|
||
final String offsetFileName = offsetJson.get("file").asText(); | ||
final long offsetPosition = offsetJson.get("pos").asLong(); | ||
if (eventFileName.compareTo(offsetFileName) != 0) { | ||
return eventFileName.compareTo(offsetFileName) > 0; | ||
} | ||
|
||
return eventPosition > offsetPosition; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Judging by the code here we're evaluating whether the offset associated with the event is past the offset (which is inline with the logic of sending state messages). Do you think it makes sense to rename this interface method? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup! Even I agree with Akash, lets change the method name to |
||
} | ||
|
||
@Override | ||
public boolean isSameOffset(final Map<String, String> offsetA, final Map<String, String> offsetB) { | ||
if ((offsetA == null || offsetA.size() != 1) || (offsetB == null || offsetB.size() != 1)) { | ||
return false; | ||
} | ||
|
||
final JsonNode offsetJsonA = Jsons.deserialize((String) offsetA.values().toArray()[0]); | ||
final String offsetAFileName = offsetJsonA.get("file").asText(); | ||
final long offsetAPosition = offsetJsonA.get("pos").asLong(); | ||
|
||
final JsonNode offsetJsonB = Jsons.deserialize((String) offsetB.values().toArray()[0]); | ||
final String offsetBFileName = offsetJsonB.get("file").asText(); | ||
final long offsetBPosition = offsetJsonB.get("pos").asLong(); | ||
|
||
return offsetAFileName.equals(offsetBFileName) && offsetAPosition == offsetBPosition; | ||
} | ||
|
||
@Override | ||
public MySqlCdcPosition extractPositionFromHeartbeatOffset(final Map<String, ?> sourceOffset) { | ||
return new MySqlCdcPosition(sourceOffset.get("file").toString(), (Long) sourceOffset.get("pos")); | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -304,6 +304,47 @@ protected void syncShouldHandlePurgedLogsGracefully() throws Exception { | |||||
"Expected 46 records to be replicated in the second sync."); | ||||||
} | ||||||
|
||||||
/** | ||||||
* This test verifies that multiple states are sent during the CDC process based on number of records. | ||||||
* We can ensure that more than one `STATE` type of message is sent, but we are not able to assert | ||||||
* the exact number of messages sent as depends on Debezium. | ||||||
* | ||||||
* @throws Exception Exception happening in the test. | ||||||
*/ | ||||||
@Test | ||||||
protected void verifyCheckpointStatesByRecords() throws Exception { | ||||||
// We require a huge amount of records, otherwise Debezium will notify directly the last offset. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIRC there is a config option to control the number of state messages? so that it can be set to something smaller : example : Line 106 in a9dacb9
Can we use the same config to override the default state emission frequency for cdc incremental runs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you set that in the config, it will be respected : Line 117 in a9dacb9
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By default for the tests we are already setting the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We already overriding it to 1 like Subodh said. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. I guess my question is why do we need a large number of records to create to test this? Is this because the offset (bin log file/pos) doesn't advance unless we are inserting such a large amount of records? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is a good question. I don't think we need that many, but 20k records isn't a lot of records anyway. We can reduce this if we need to. @subodh1810 thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need so many records because the offset file might not always be available to read (being edited by Debezium in parallel) |
||||||
final int recordsToCreate = 20000; | ||||||
|
||||||
final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = getSource() | ||||||
.read(getConfig(), CONFIGURED_CATALOG, null); | ||||||
final List<AirbyteMessage> dataFromFirstBatch = AutoCloseableIterators | ||||||
.toListAndClose(firstBatchIterator); | ||||||
final List<AirbyteStateMessage> stateMessages = extractStateMessages(dataFromFirstBatch); | ||||||
|
||||||
// As first `read` operation is from snapshot, it would generate only one state message at the end | ||||||
// of the process. | ||||||
assertExpectedStateMessages(stateMessages); | ||||||
|
||||||
for (int recordsCreated = 0; recordsCreated < recordsToCreate; recordsCreated++) { | ||||||
final JsonNode record = | ||||||
Jsons.jsonNode(ImmutableMap | ||||||
.of(COL_ID, 200 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL, | ||||||
"F-" + recordsCreated)); | ||||||
writeModelRecord(record); | ||||||
} | ||||||
|
||||||
final JsonNode stateAfterFirstSync = Jsons.jsonNode(Collections.singletonList(stateMessages.get(stateMessages.size() - 1))); | ||||||
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = getSource() | ||||||
.read(getConfig(), CONFIGURED_CATALOG, stateAfterFirstSync); | ||||||
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators | ||||||
.toListAndClose(secondBatchIterator); | ||||||
assertEquals(recordsToCreate, extractRecordMessages(dataFromSecondBatch).size()); | ||||||
final List<AirbyteStateMessage> stateMessagesCDC = extractStateMessages(dataFromSecondBatch); | ||||||
assertTrue(stateMessagesCDC.size() > 1, "Generated only the final state."); | ||||||
assertEquals(stateMessagesCDC.size(), stateMessagesCDC.stream().distinct().count(), "There are duplicated states."); | ||||||
} | ||||||
|
||||||
protected void assertStateForSyncShouldHandlePurgedLogsGracefully(final List<AirbyteStateMessage> stateMessages, final int syncNumber) { | ||||||
assertExpectedStateMessages(stateMessages); | ||||||
} | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
# Source Mysql Migration Guide | ||
# MySQL Migration Guide | ||
|
||
## Upgrading to 3.0.0 | ||
CDC syncs now has default cursor field called `_ab_cdc_cursor`. You will need to force normalization to rebuild your destination tables by manually dropping the SCD tables, refreshing the connection schema (skipping the reset), and running a sync. Alternatively, you can just run a reset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : update the comment on the interface method