-
Notifications
You must be signed in to change notification settings - Fork 4.5k
upgrade debezium version to 1.9.2 for mysql source #14542
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -107,7 +107,7 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob | |||
*/ | |||
@Override | |||
protected void putBoolean(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { | |||
node.put(columnName, resultSet.getInt(index) == 1); | |||
node.put(columnName, resultSet.getInt(index) > 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not related to debezium version upgrade, I found a bug while testing in Full refresh mode so went ahead and fixed it
@@ -40,24 +39,9 @@ public void configure(final Properties props) {} | |||
public void converterFor(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) { | |||
if (Arrays.stream(DATE_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { | |||
registerDate(field, registration); | |||
} else if (Arrays.stream(TEXT_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new debezium version handles string values as expected so our custom logic is not required anymore
* {@link #storeRecord(HistoryRecord)} method but {@link FilteredFileDatabaseHistory} is a final | ||
* class and can not be inherited | ||
*/ | ||
public class FilteredFileDatabaseHistory extends AbstractDatabaseHistory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new debezium version filters out the schema history records only for the database that its syncing so our custom logic is not required anymore
/test connector=connectors/source-mysql
Build PassedTest summary info:
|
|
||
final Optional<CdcState> cdcState = Optional.ofNullable(stateManager.getCdcStateManager().getCdcState()); | ||
final MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState.orElse(null)); | ||
cdcState.ifPresent(cdc -> checkBinlog(cdc.getState(), database)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer required cause now if the last saved binlog position in the source database is purged, we would handle it gracefully and sync data from beginning. I have added a test case as well for this scenario.
/test connector=connectors/source-mysql
Build FailedTest summary info:
|
/test connector=connectors/source-mysql
Build PassedTest summary info:
|
I cannot tell from the code if/how this change will break jobs that were created with the current version of MySQL Connector. Could you test that manually? |
@grishick I tested this manually by setting up a mysql connector on version 0.5.15 (current version) in CDC mode and then ran few syncs and then switched the connector to use my local dev version which contained the changes from the PR and it worked fine so this is not going to cause a breaking change. I incorrectly assumed that it would have caused a breaking change cause of failing tests when I had upgraded the debezium version and realised that the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! just a couple nitpicks
@@ -51,4 +51,9 @@ public AirbyteMessage saveState(final Map<String, String> offset, final String d | |||
return new AirbyteMessage().withType(Type.STATE).withState(stateMessage); | |||
} | |||
|
|||
@Override | |||
public AirbyteMessage saveStateAfterCompletionOfSnapshotOfNewStreams() { | |||
throw new RuntimeException("Snapshot of individual tables currently not supported in MySQL"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doublechecking: this only runs when someone adds new streams to an existing catalog, which isn't currently supported in the UI anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edgao correct plus the stream state type for mysql is LEGACY so even when the feature would be enabled in the UI, it would not be triggered for mysql
"test_schema", "table_with_tiny_int", "id", "bool_col", "tiny_int_one_col", | ||
"tiny_int_two_col", "id")); | ||
|
||
executeQuery(String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: are these two queries identical? if yes, probably better to loop over data
rather than copy/pasting them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return expectedCatalog; | ||
} | ||
|
||
//TODO : Enable this test once we fix handling of DATETIME values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which issue is this waiting for? #14628 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup
} | ||
|
||
@Test | ||
protected void syncShouldHandlePurgedLogsGracefully() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this live in the base cdc source test? (and sources could explicitly opt out if it's not supported)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to make this a generic test in another PR
/test connector=connectors/source-mysql
Build PassedTest summary info:
|
* upgrade debezium version to 1.9.2 for mysql source * new debezium version doen't require special string handling * fix log message * add test tor date time data type * gracefully handle the case when logs are purged * fix test * address review comment
Issue : #14425