-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[Source-MySQL]CDC: Enable frequent state emission during incremental runs #29308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Before Merging a Connector Pull RequestWow! What a great pull request you have here! 🎉 To merge this PR, ensure the following has been done/considered for each connector added or updated:
If the checklist is complete, but the CI check is failing,
|
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest | ❌ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest | ❌ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest | ❌ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest | ❌ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
return false; | ||
} | ||
|
||
final String eventFileName = event.eventValueAsJson().get("source").get("file").asText(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a case node would not exist?
need to check if there?
final String offsetBFileName = offsetJsonB.get("file").asText(); | ||
final long offsetBPosition = offsetJsonB.get("pos").asLong(); | ||
|
||
if (offsetAFileName.compareTo(offsetBFileName) != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this redundant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also just can we just simplify this to :
if ( (!offSetAFileName.equals(offsetBFileName)) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually just
return offSetAFileName.equals(offsetBFileName) && offsetAPosition == offsetBPosition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer return offSetAFileName.equals(offsetBFileName) && offsetAPosition == offsetBPosition
|
||
@Override | ||
public boolean isSameOffset(final Map<String, String> offsetA, final Map<String, String> offsetB) { | ||
if (offsetA == null || offsetA.size() != 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : can make checks for both offsets in one conditional
*/ | ||
@Test | ||
protected void verifyCheckpointStatesByRecords() throws Exception { | ||
// We require a huge amount of records, otherwise Debezium will notify directly the last offset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC there is a config option to control the number of state messages? so that it can be set to something smaller : example :
Line 106 in a9dacb9
} |
Can we use the same config to override the default state emission frequency for cdc incremental runs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you set that in the config, it will be respected :
Line 117 in a9dacb9
config.get("sync_checkpoint_seconds") != null ? Duration.ofSeconds(config.get("sync_checkpoint_seconds").asLong()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default for the tests we are already setting the sync_checkpoint_records
to 1
.put("sync_checkpoint_records", 1)
(here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already overriding it to 1 like Subodh said.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I guess my question is why do we need a large number of records to create to test this? Is this because the offset (bin log file/pos) doesn't advance unless we are inserting such a large amount of records?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good question. I don't think we need that many, but 20k records isn't a lot of records anyway. We can reduce this if we need to. @subodh1810 thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need so many records because the offset file might not always be available to read (being edited by Debezium in parallel)
return eventFileName.compareTo(offsetFileName) > 0; | ||
} | ||
|
||
return eventPosition > offsetPosition; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Judging by the code here we're evaluating whether the offset associated with the event is past the offset (which is inline with the logic of sending state messages). Do you think it makes sense to rename this interface method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup! Even I agree with Akash, lets change the method name to isEventAheadOffset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments + questions
return eventFileName.compareTo(offsetFileName) > 0; | ||
} | ||
|
||
return eventPosition > offsetPosition; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup! Even I agree with Akash, lets change the method name to isEventAheadOffset
final String offsetBFileName = offsetJsonB.get("file").asText(); | ||
final long offsetBPosition = offsetJsonB.get("pos").asLong(); | ||
|
||
if (offsetAFileName.compareTo(offsetBFileName) != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer return offSetAFileName.equals(offsetBFileName) && offsetAPosition == offsetBPosition
*/ | ||
@Test | ||
protected void verifyCheckpointStatesByRecords() throws Exception { | ||
// We require a huge amount of records, otherwise Debezium will notify directly the last offset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default for the tests we are already setting the sync_checkpoint_records
to 1
.put("sync_checkpoint_records", 1)
(here)
a009e10
to
c291768
Compare
@@ -61,7 +61,7 @@ default boolean isHeartbeatSupported() { | |||
* @param event Event from the CDC load | |||
* @return Returns `true` when the record is behind the offset. Otherwise, it returns `false` | |||
*/ | |||
default boolean isRecordBehindOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) { | |||
default boolean isEventAheadOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : update the comment on the interface method
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest | ❌ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
combined conditionals for cleanliness
Bumped connector versions
c291768
to
57abadf
Compare
/legacy-test connector=connectors/source-mysql |
/legacy-test connector=connectors/source-mysql-strict-encrypt |
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ❌ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest | ❌ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ❌ |
Build connector tar | ✅ |
Build source-mysql-strict-encrypt docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql-strict-encrypt:integrationTest | ❌ |
Acceptance tests | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql-strict-encrypt test
/legacy-test connector=connectors/source-mysql |
/legacy-test connector=connectors/source-mysql-strict-encrypt |
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest | ❌ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ❌ |
Build connector tar | ✅ |
Build source-mysql-strict-encrypt docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mysql-strict-encrypt:integrationTest | ❌ |
Acceptance tests | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql-strict-encrypt test
/approve-and-merge reason="All tests passed locally. Merging as CI is flaky" |
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
|
Step | Result |
---|---|
Java Connector Unit Tests | ❌ |
Build connector tar | ✅ |
Build source-mysql-strict-encrypt docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ❌ |
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql-strict-encrypt test
What
Enable checkpointing for MySQL CDC mode during incremental sync runs
How
Overrode 2 methods in

MysqlCdcTargetPosition
-isRecordBehindOffset
andisSameOffset
so that in thecomputeNext
method ofDebeziumStateDecoratingIterator
to verify that the airbyte record processing events are not ahead of what Debezium has already processed and emit Airbyte state messages accordingly.Sync log line indicating that checkpointing state has been emitted:
Recommended reading order
MySqlCdcTargetPosition.java
MySqlCdcStateHandler.java
CdcMysqlSourceTest.java
🚨 User Impact 🚨
Users should not see any functional impact to their syncs. An extra log line and checkpointing states will be emitted on a cadence.
Testing plan
Doc