Skip to content

upgrade debezium version to 1.9.2 for mysql source #14542

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 20, 2022

Conversation

subodh1810
Copy link
Contributor

@subodh1810 subodh1810 commented Jul 8, 2022

Issue : #14425

@subodh1810 subodh1810 self-assigned this Jul 9, 2022
@@ -107,7 +107,7 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
*/
@Override
protected void putBoolean(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getInt(index) == 1);
node.put(columnName, resultSet.getInt(index) > 0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related to debezium version upgrade, I found a bug while testing in Full refresh mode so went ahead and fixed it

@@ -40,24 +39,9 @@ public void configure(final Properties props) {}
public void converterFor(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
if (Arrays.stream(DATE_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerDate(field, registration);
} else if (Arrays.stream(TEXT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
Copy link
Contributor Author

@subodh1810 subodh1810 Jul 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new debezium version handles string values as expected so our custom logic is not required anymore

* {@link #storeRecord(HistoryRecord)} method but {@link FilteredFileDatabaseHistory} is a final
* class and can not be inherited
*/
public class FilteredFileDatabaseHistory extends AbstractDatabaseHistory {
Copy link
Contributor Author

@subodh1810 subodh1810 Jul 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new debezium version filters out the schema history records only for the database that its syncing so our custom logic is not required anymore

@subodh1810 subodh1810 marked this pull request as ready for review July 9, 2022 10:44
@subodh1810 subodh1810 requested a review from a team as a code owner July 9, 2022 10:44
@subodh1810 subodh1810 linked an issue Jul 9, 2022 that may be closed by this pull request
@subodh1810
Copy link
Contributor Author

subodh1810 commented Jul 9, 2022

/test connector=connectors/source-mysql

🕑 connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/2640817796
✅ connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/2640817796
No Python unittests run

Build Passed

Test summary info:

All Passed

@subodh1810 subodh1810 requested a review from edgao July 9, 2022 14:56

final Optional<CdcState> cdcState = Optional.ofNullable(stateManager.getCdcStateManager().getCdcState());
final MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState.orElse(null));
cdcState.ifPresent(cdc -> checkBinlog(cdc.getState(), database));
Copy link
Contributor Author

@subodh1810 subodh1810 Jul 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer required cause now if the last saved binlog position in the source database is purged, we would handle it gracefully and sync data from beginning. I have added a test case as well for this scenario.

@subodh1810 subodh1810 requested a review from grishick July 9, 2022 18:15
@subodh1810
Copy link
Contributor Author

subodh1810 commented Jul 9, 2022

/test connector=connectors/source-mysql

🕑 connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/2641969287
❌ connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/2641969287
🐛 https://gradle.com/s/bmbg3lbaq7zha

Build Failed

Test summary info:

Could not find result summary

@subodh1810
Copy link
Contributor Author

subodh1810 commented Jul 10, 2022

/test connector=connectors/source-mysql

🕑 connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/2644179215
✅ connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/2644179215
No Python unittests run

Build Passed

Test summary info:

All Passed

@grishick
Copy link
Contributor

I cannot tell from the code if/how this change will break jobs that were created with the current version of MySQL Connector. Could you test that manually?

@subodh1810
Copy link
Contributor Author

@grishick I tested this manually by setting up a mysql connector on version 0.5.15 (current version) in CDC mode and then ran few syncs and then switched the connector to use my local dev version which contained the changes from the PR and it worked fine so this is not going to cause a breaking change.

I incorrectly assumed that it would have caused a breaking change cause of failing tests when I had upgraded the debezium version and realised that the FileDatabaseHistory is no longer required with the latest debezium version.

Copy link
Contributor

@edgao edgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! just a couple nitpicks

@@ -51,4 +51,9 @@ public AirbyteMessage saveState(final Map<String, String> offset, final String d
return new AirbyteMessage().withType(Type.STATE).withState(stateMessage);
}

@Override
public AirbyteMessage saveStateAfterCompletionOfSnapshotOfNewStreams() {
throw new RuntimeException("Snapshot of individual tables currently not supported in MySQL");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doublechecking: this only runs when someone adds new streams to an existing catalog, which isn't currently supported in the UI anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgao correct plus the stream state type for mysql is LEGACY so even when the feature would be enabled in the UI, it would not be triggered for mysql

"test_schema", "table_with_tiny_int", "id", "bool_col", "tiny_int_one_col",
"tiny_int_two_col", "id"));

executeQuery(String
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: are these two queries identical? if yes, probably better to loop over data rather than copy/pasting them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return expectedCatalog;
}

//TODO : Enable this test once we fix handling of DATETIME values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which issue is this waiting for? #14628 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

}

@Test
protected void syncShouldHandlePurgedLogsGracefully() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this live in the base cdc source test? (and sources could explicitly opt out if it's not supported)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to make this a generic test in another PR

@subodh1810
Copy link
Contributor Author

subodh1810 commented Jul 20, 2022

/test connector=connectors/source-mysql

🕑 connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/2703506127
✅ connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/2703506127
No Python unittests run

Build Passed

Test summary info:

All Passed

@subodh1810 subodh1810 merged commit 83b0f81 into master Jul 20, 2022
@subodh1810 subodh1810 deleted the upgrade-debezium-version-mysql-1-9-2 branch July 20, 2022 10:45
mfsiega-airbyte pushed a commit that referenced this pull request Jul 21, 2022
* upgrade debezium version to 1.9.2 for mysql source

* new debezium version doen't require special string handling

* fix log message

* add test tor date time data type

* gracefully handle the case when logs are purged

* fix test

* address review comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Upgrade debezium version for MySQL source
4 participants