Skip to content

[Source-MySQL]CDC: Enable frequent state emission during incremental runs #29308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 22, 2023

Conversation

nguyenaiden
Copy link
Contributor

@nguyenaiden nguyenaiden commented Aug 9, 2023

What

Enable checkpointing for MySQL CDC mode during incremental sync runs

How

Overrode 2 methods in MysqlCdcTargetPosition - isRecordBehindOffset and isSameOffset so that in the computeNext method of DebeziumStateDecoratingIterator to verify that the airbyte record processing events are not ahead of what Debezium has already processed and emit Airbyte state messages accordingly.
Sync log line indicating that checkpointing state has been emitted:
image

Recommended reading order

  1. MySqlCdcTargetPosition.java
  2. MySqlCdcStateHandler.java
  3. CdcMysqlSourceTest.java

🚨 User Impact 🚨

Users should not see any functional impact to their syncs. An extra log line and checkpointing states will be emitted on a cadence.

Testing plan

Doc

@github-actions
Copy link
Contributor

github-actions bot commented Aug 9, 2023

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

@nguyenaiden nguyenaiden marked this pull request as ready for review August 9, 2023 20:50
@nguyenaiden nguyenaiden requested a review from a team as a code owner August 9, 2023 20:50
@octavia-squidington-iii
Copy link
Collaborator

source-mysql test report (commit 10fc23225c) - ❌

⏲️ Total pipeline duration: 20mn21s

Step Result
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Build connector tar
Build source-mysql docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

@octavia-squidington-iii
Copy link
Collaborator

source-mysql test report (commit 10fc23225c) - ❌

⏲️ Total pipeline duration: 19mn32s

Step Result
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Build connector tar
Build source-mysql docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

@octavia-squidington-iii
Copy link
Collaborator

source-mysql test report (commit 3b43f276d7) - ❌

⏲️ Total pipeline duration: 21mn30s

Step Result
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Build connector tar
Build source-mysql docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

@octavia-squidington-iii
Copy link
Collaborator

source-mysql test report (commit a009e1015b) - ❌

⏲️ Total pipeline duration: 20mn12s

Step Result
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Build connector tar
Build source-mysql docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

return false;
}

final String eventFileName = event.eventValueAsJson().get("source").get("file").asText();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case node would not exist?
need to check if there?

final String offsetBFileName = offsetJsonB.get("file").asText();
final long offsetBPosition = offsetJsonB.get("pos").asLong();

if (offsetAFileName.compareTo(offsetBFileName) != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this redundant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also just can we just simplify this to :
if ( (!offSetAFileName.equals(offsetBFileName)) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually just
return offSetAFileName.equals(offsetBFileName) && offsetAPosition == offsetBPosition

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer return offSetAFileName.equals(offsetBFileName) && offsetAPosition == offsetBPosition


@Override
public boolean isSameOffset(final Map<String, String> offsetA, final Map<String, String> offsetB) {
if (offsetA == null || offsetA.size() != 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : can make checks for both offsets in one conditional

*/
@Test
protected void verifyCheckpointStatesByRecords() throws Exception {
// We require a huge amount of records, otherwise Debezium will notify directly the last offset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC there is a config option to control the number of state messages? so that it can be set to something smaller : example :

Can we use the same config to override the default state emission frequency for cdc incremental runs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you set that in the config, it will be respected :

config.get("sync_checkpoint_seconds") != null ? Duration.ofSeconds(config.get("sync_checkpoint_seconds").asLong())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default for the tests we are already setting the sync_checkpoint_records to 1
.put("sync_checkpoint_records", 1) (here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already overriding it to 1 like Subodh said.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I guess my question is why do we need a large number of records to create to test this? Is this because the offset (bin log file/pos) doesn't advance unless we are inserting such a large amount of records?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good question. I don't think we need that many, but 20k records isn't a lot of records anyway. We can reduce this if we need to. @subodh1810 thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need so many records because the offset file might not always be available to read (being edited by Debezium in parallel)

return eventFileName.compareTo(offsetFileName) > 0;
}

return eventPosition > offsetPosition;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Judging by the code here we're evaluating whether the offset associated with the event is past the offset (which is inline with the logic of sending state messages). Do you think it makes sense to rename this interface method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! Even I agree with Akash, lets change the method name to isEventAheadOffset

Copy link
Contributor

@akashkulk akashkulk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments + questions

return eventFileName.compareTo(offsetFileName) > 0;
}

return eventPosition > offsetPosition;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! Even I agree with Akash, lets change the method name to isEventAheadOffset

final String offsetBFileName = offsetJsonB.get("file").asText();
final long offsetBPosition = offsetJsonB.get("pos").asLong();

if (offsetAFileName.compareTo(offsetBFileName) != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer return offSetAFileName.equals(offsetBFileName) && offsetAPosition == offsetBPosition

*/
@Test
protected void verifyCheckpointStatesByRecords() throws Exception {
// We require a huge amount of records, otherwise Debezium will notify directly the last offset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default for the tests we are already setting the sync_checkpoint_records to 1
.put("sync_checkpoint_records", 1) (here)

@nguyenaiden nguyenaiden force-pushed the mysql-cdc-checkpointing branch from a009e10 to c291768 Compare August 16, 2023 22:45
@@ -61,7 +61,7 @@ default boolean isHeartbeatSupported() {
* @param event Event from the CDC load
* @return Returns `true` when the record is behind the offset. Otherwise, it returns `false`
*/
default boolean isRecordBehindOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
default boolean isEventAheadOffset(final Map<String, String> offset, final ChangeEventWithMetadata event) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : update the comment on the interface method

@octavia-squidington-iii
Copy link
Collaborator

source-mysql test report (commit c291768937) - ❌

⏲️ Total pipeline duration: 20mn12s

Step Result
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Build connector tar
Build source-mysql docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

@nguyenaiden
Copy link
Contributor Author

/legacy-test connector=connectors/source-mysql

@nguyenaiden
Copy link
Contributor Author

/legacy-test connector=connectors/source-mysql-strict-encrypt

@github-actions
Copy link
Contributor

source-mysql test report (commit 57abadf4da) - ❌

⏲️ Total pipeline duration: 20mn51s

Step Result
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Build connector tar
Build source-mysql docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

@github-actions
Copy link
Contributor

source-mysql-strict-encrypt test report (commit 57abadf4da) - ❌

⏲️ Total pipeline duration: 12mn59s

Step Result
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Build connector tar
Build source-mysql-strict-encrypt docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql-strict-encrypt:integrationTest
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql-strict-encrypt test

@pedroslopez
Copy link
Contributor

/legacy-test connector=connectors/source-mysql

@pedroslopez
Copy link
Contributor

/legacy-test connector=connectors/source-mysql-strict-encrypt

@github-actions
Copy link
Contributor

source-mysql test report (commit d4f68b0def) - ❌

⏲️ Total pipeline duration: 19mn43s

Step Result
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Build connector tar
Build source-mysql docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql:integrationTest
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

@github-actions
Copy link
Contributor

source-mysql-strict-encrypt test report (commit d4f68b0def) - ❌

⏲️ Total pipeline duration: 11mn42s

Step Result
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Build connector tar
Build source-mysql-strict-encrypt docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mysql-strict-encrypt:integrationTest
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql-strict-encrypt test

@nguyenaiden
Copy link
Contributor Author

/approve-and-merge reason="All tests passed locally. Merging as CI is flaky"

@octavia-approvington
Copy link
Contributor

It's time
fine lets go

@octavia-approvington octavia-approvington merged commit b7dc462 into master Aug 22, 2023
@octavia-approvington octavia-approvington deleted the mysql-cdc-checkpointing branch August 22, 2023 01:13
@github-actions
Copy link
Contributor

source-mysql test report (commit 54bba8bdc4) - ❌

⏲️ Total pipeline duration: 36mn34s

Step Result
Java Connector Unit Tests
Build connector tar
Build source-mysql docker image for platform linux/x86_64
Java Connector Integration Tests
Acceptance tests
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml
Connector version semver check
Connector version increment check
QA checks

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql test

@github-actions
Copy link
Contributor

source-mysql-strict-encrypt test report (commit 54bba8bdc4) - ❌

⏲️ Total pipeline duration: 39mn53s

Step Result
Java Connector Unit Tests
Build connector tar
Build source-mysql-strict-encrypt docker image for platform linux/x86_64
Java Connector Integration Tests
Acceptance tests
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml
Connector version semver check
Connector version increment check
QA checks

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mysql-strict-encrypt test

harrytou pushed a commit to KYVENetwork/airbyte that referenced this pull request Sep 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants