-
Notifications
You must be signed in to change notification settings - Fork 4.6k
✨ Source MongoDB Internal POC: Use global state for CDC #29678
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Before Merging a Connector Pull RequestWow! What a great pull request you have here! 🎉 To merge this PR, ensure the following has been done/considered for each connector added or updated:
If the checklist is complete, but the CI check is failing,
|
|
Step | Result |
---|---|
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
Build connector tar | ✅ |
Build source-mongodb-internal-poc docker image for platform linux/x86_64 | ✅ |
./gradlew :airbyte-integrations:connectors:source-mongodb-internal-poc:integrationTest | ❌ |
Acceptance tests | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mongodb-internal-poc test
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-mongodb-internal-poc docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mongodb-internal-poc test
|
Step | Result |
---|---|
Java Connector Unit Tests | ❌ |
Build connector tar | ✅ |
Build source-mongodb-internal-poc docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mongodb-internal-poc test
@@ -112,7 +112,7 @@ public AirbyteMessage next() { | |||
} | |||
|
|||
final var stateMessage = new AirbyteStateMessage() | |||
.withType(AirbyteStateType.STREAM) | |||
.withType(AirbyteStateType.GLOBAL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this all we need to do? I would expect to have to do a little more than that. Based on my understanding of stream vs global, I would expect for us to be setting the state of all streams in one same message when using the GLOBAL
type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are going to need to change what we put in there, but we are not there yet. This should still work for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to create an object of the AirbyteGlobalState
and then set the parameters in that object. Something like
final AirbyteGlobalState globalState = new AirbyteGlobalState();
globalState.setSharedState(debezium_state);
globalState.setStreamStates(array_containing_all_the_streams);
final var stateMessage = new AirbyteStateMessage()
.withType(AirbyteStateType.GLOBAL)
.withGlobal(globalState);
@@ -112,7 +112,7 @@ public AirbyteMessage next() { | |||
} | |||
|
|||
final var stateMessage = new AirbyteStateMessage() | |||
.withType(AirbyteStateType.STREAM) | |||
.withType(AirbyteStateType.GLOBAL) | |||
.withStream(streamState); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to the above comment, I would guess that we need to change this to withGlobal
@@ -141,7 +141,7 @@ protected Map<String, MongodbStreamState> convertState(final JsonNode state) { | |||
|
|||
// TODO add namespace support? | |||
return states.stream() | |||
.filter(s -> s.getType() == AirbyteStateType.STREAM) | |||
.filter(s -> s.getType() == AirbyteStateType.GLOBAL) | |||
.map(s -> new CollectionNameState( | |||
Optional.ofNullable(s.getStream().getStreamDescriptor()).map(StreamDescriptor::getName), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to the comment below -> we should probably be using getGlobal
|
Step | Result |
---|---|
Java Connector Unit Tests | ❌ |
Build connector tar | ✅ |
Build source-mongodb-internal-poc docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mongodb-internal-poc test
|
Step | Result |
---|---|
Java Connector Unit Tests | ❌ |
Build connector tar | ✅ |
Build source-mongodb-internal-poc docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mongodb-internal-poc test
Closing in favor of #29763 |
What
How
GLOBAL
Recommended reading order
MongoDbStateIterator.java