-
Notifications
You must be signed in to change notification settings - Fork 4.5k
DBZ iterator migration to use SourceStateIterator #36333
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
...dk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.java
Outdated
Show resolved
Hide resolved
@@ -14,7 +14,7 @@ java { | |||
airbyteJavaConnector { | |||
cdkVersionRequired = '0.23.19' | |||
features = ['db-sources', 'datastore-postgres'] | |||
useLocalCdk = false | |||
useLocalCdk = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to properly validate this change, it needs to be applied to at least one connector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep - this change exactly serves this before releasing cdk - so all tests on postgres would pass with this change. once approved i'll need to release cdk and bump up the required version here too.
...es/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.java
Outdated
Show resolved
Hide resolved
...es/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.java
Outdated
Show resolved
Hide resolved
...es/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.java
Outdated
Show resolved
Hide resolved
} | ||
if (offsetManager == null) { | ||
throw new RuntimeException("Offset can not be null"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this check to the constructor.
...es/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.java
Outdated
Show resolved
Hide resolved
...es/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.java
Outdated
Show resolved
Hide resolved
// include all necessary data such as snapshot completion. | ||
// This is the case for MS SQL Server, at least. | ||
return createStateMessage(initialOffset); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might be able to get rid of this check entirely at this point. Try it out and see if source-mssql tests pass
cc @akashkulk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will break some test - it's minor though, turns out state in global state will have some minor changes:
before:
"mssql_cdc_offset": {
"[\"db_ziwfjgytty\",{\"server\":\"db_ziwfjgytty\",\"database\":\"db_ziwfjgytty\"}]": "{\"commit_lsn\":\"00000029:00000298:0003\",\"snapshot\":true,\"snapshot_completed\":true}"
},
after:
"mssql_cdc_offset": {
"[\"db_ziwfjgytty\",{\"server\":\"db_ziwfjgytty\",\"database\":\"db_ziwfjgytty\"}]": "{\"transaction_id\":null,\"event_serial_no\":0,\"commit_lsn\":\"00000029:00000298:0003\",\"change_lsn\":\"NULL\"}"
},
I'll revert this change for now
...es/src/main/java/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.java
Outdated
Show resolved
Hide resolved
previousCheckpointOffset.clear(); | ||
previousCheckpointOffset.putAll(checkpointOffsetToSend); | ||
shouldEmitStateMessage = false; | ||
return stateMessage; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we call resetCheckpointValues()
i.e. call checkpointOffsetToSend.clear();
before returning a state message?
Also we can probably replace resetCheckpointValues()
with checkpointOffsetToSend.clear()`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a great catch, thanks! that actually solved test issue I've had since I turned up the mssql. Surprisingly tests on postgres did not capture that
this.schemaHistoryManager = schemaHistoryManager; | ||
this.previousCheckpointOffset = (HashMap<String, String>) offsetManager.read(); | ||
this.initialOffset = new HashMap<>(this.previousCheckpointOffset); | ||
resetCheckpointValues(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : not sure if there is a need for this? I understand you're porting over code from before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed. yeah initial value has been empty. Previously it also reset other stuff but I've deleted most of them so it's not applicable here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some small nits
/publish-java-cdk
|
also manually tested on postgres and mssql on cdc incremental sync. |
Move DBZ iterator to use SourceStateIterator
Apply this on postgres