-
Notifications
You must be signed in to change notification settings - Fork 4.5k
MySQL CDC sync fails because starting binlog position not found in DB #6425 #9514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MySQL CDC sync fails because starting binlog position not found in DB #6425 #9514
Conversation
…description into log
/test connector=connectors/source-mysql
|
/test connector=connectors/source-mysql-strict-encrypt
|
/test connector=connectors/source-mysql-strict-encrypt
|
@@ -180,8 +185,12 @@ private static boolean shouldUseCDC(final ConfiguredAirbyteCatalog catalog) { | |||
new AirbyteDebeziumHandler(sourceConfig, MySqlCdcTargetPosition.targetPosition(database), MySqlCdcProperties.getDebeziumProperties(), | |||
catalog, true); | |||
|
|||
return handler.getIncrementalIterators(new MySqlCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()), | |||
new MySqlCdcStateHandler(stateManager), new MySqlCdcConnectorMetadataInjector(), emittedAt); | |||
CdcState cdcState = stateManager.getCdcStateManager().getCdcState(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could stateManager.getCdcStateManager()
return null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, from what I can see from the code :
public StateManager(final DbState serialized, final ConfiguredAirbyteCatalog catalog) {
this.cdcStateManager = new CdcStateManager(serialized.getCdcState());
this.isCdc = serialized.getCdc();
if (serialized.getCdc() == null) {
this.isCdc = false;
}
new MySqlCdcStateHandler(stateManager), new MySqlCdcConnectorMetadataInjector(), emittedAt); | ||
CdcState cdcState = stateManager.getCdcStateManager().getCdcState(); | ||
MySqlCdcSavedInfoFetcher fetcher = new MySqlCdcSavedInfoFetcher(cdcState); | ||
if (cdcState != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use Optional
type here?
@@ -229,4 +238,42 @@ public static void main(final String[] args) throws Exception { | |||
}; | |||
} | |||
|
|||
private void checkBinlog(JsonNode offset, JdbcDatabase database) { | |||
String binlog = getBinlog(offset); | |||
if (binlog != null && !binlog.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use Optional type here?
LOGGER.info(String.format("Binlog %s is available", binlog)); | ||
} else { | ||
String error = | ||
String.format("Binlog %s is not available. This is a critical error, it means that requested binlog is not present on mysql server. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe, we can use Text block here instead of string concatenation https://openjdk.java.net/jeps/378, as we migrate to Java 17.
Map.Entry<String, JsonNode> jsonField = fields.next(); | ||
return Jsons.deserialize(jsonField.getValue().asText()).path("file").asText(); | ||
} | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we return null here?
/test connector=connectors/source-mysql
|
/test connector=connectors/source-mysql-strict-encrypt
|
String error = | ||
""" | ||
Binlog %s is not available. This is a critical error, it means that requested binlog is not present on mysql server. To fix data synchronization you need to reset your data. Please check binlog retention policy configurations.""" | ||
.formatted(binlog); | ||
LOGGER.error(error); | ||
throw new RuntimeException(""" | ||
Binlog %s is not available.""".formatted(binlog)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this part of the code executed too if binlogOptional.ifPresent
is not true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this part of the code executed too if
binlogOptional.ifPresent
is not true?
-
binlogOptional.ifPresent check that saved cdc_offset contain information about last sync binlog and returns binlog file name
-
if (isBinlogAvailable(binlog, database)) - will check that required for sync binlog file is present on mysql-server
if binlog is not on mysql server anymore only then this part of code will be executed.
@ChristopheDuong can we proceed with publishing this changes? |
/publish connector=connectors/source-mysql
|
/publish connector=connectors/source-mysql-strict-encrypt
|
/test connector=connectors/source-mysql
|
/publish connector=connectors/source-mysql-strict-encrypt
|
/test connector=connectors/source-mysql-strict-encrypt
|
What
Check required binlog for CDC sync on mysql server before data sync and add error description into log if it absent
How
Before start CDC sync implementation will check required binary log on mysql server and if it's absent will add error description about it with corresponding explanation and as possible solution will as to make "reset data"
Recommended reading order
x.java
y.python
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/SUMMARY.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changesThis change is