Skip to content

✨ Source MongoDB Internal POC: Use global state for CDC #29678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

jdpgrailsdev
Copy link
Contributor

What

  • Use correct state type for CDC connector

How

  • Switch state type to GLOBAL

Recommended reading order

  1. MongoDbStateIterator.java

@github-actions
Copy link
Contributor

github-actions bot commented Aug 21, 2023

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

@github-actions
Copy link
Contributor

source-mongodb-internal-poc test report (commit f7c3e46e81) - ❌

⏲️ Total pipeline duration: 05mn26s

Step Result
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Build connector tar
Build source-mongodb-internal-poc docker image for platform linux/x86_64
./gradlew :airbyte-integrations:connectors:source-mongodb-internal-poc:integrationTest
Acceptance tests

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mongodb-internal-poc test

@github-actions
Copy link
Contributor

source-mongodb-internal-poc test report (commit 03164d05e4) - ❌

⏲️ Total pipeline duration: 07mn55s

Step Result
Java Connector Unit Tests
Build connector tar
Build source-mongodb-internal-poc docker image for platform linux/x86_64
Java Connector Integration Tests
Acceptance tests
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml
Connector version semver check
Connector version increment check
QA checks

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mongodb-internal-poc test

@github-actions
Copy link
Contributor

source-mongodb-internal-poc test report (commit 01ccf7fc06) - ❌

⏲️ Total pipeline duration: 09mn00s

Step Result
Java Connector Unit Tests
Build connector tar
Build source-mongodb-internal-poc docker image for platform linux/x86_64
Java Connector Integration Tests
Acceptance tests
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml
Connector version semver check
Connector version increment check
QA checks

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mongodb-internal-poc test

@@ -112,7 +112,7 @@ public AirbyteMessage next() {
}

final var stateMessage = new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withType(AirbyteStateType.GLOBAL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this all we need to do? I would expect to have to do a little more than that. Based on my understanding of stream vs global, I would expect for us to be setting the state of all streams in one same message when using the GLOBAL type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are going to need to change what we put in there, but we are not there yet. This should still work for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to create an object of the AirbyteGlobalState and then set the parameters in that object. Something like

 final AirbyteGlobalState globalState = new AirbyteGlobalState();
      globalState.setSharedState(debezium_state);
      globalState.setStreamStates(array_containing_all_the_streams);
      
 final var stateMessage = new AirbyteStateMessage()
          .withType(AirbyteStateType.GLOBAL)
          .withGlobal(globalState);      

@@ -112,7 +112,7 @@ public AirbyteMessage next() {
}

final var stateMessage = new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withType(AirbyteStateType.GLOBAL)
.withStream(streamState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to the above comment, I would guess that we need to change this to withGlobal

@@ -141,7 +141,7 @@ protected Map<String, MongodbStreamState> convertState(final JsonNode state) {

// TODO add namespace support?
return states.stream()
.filter(s -> s.getType() == AirbyteStateType.STREAM)
.filter(s -> s.getType() == AirbyteStateType.GLOBAL)
.map(s -> new CollectionNameState(
Optional.ofNullable(s.getStream().getStreamDescriptor()).map(StreamDescriptor::getName),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to the comment below -> we should probably be using getGlobal

@github-actions
Copy link
Contributor

source-mongodb-internal-poc test report (commit 1c4ca47528) - ❌

⏲️ Total pipeline duration: 08mn58s

Step Result
Java Connector Unit Tests
Build connector tar
Build source-mongodb-internal-poc docker image for platform linux/x86_64
Java Connector Integration Tests
Acceptance tests
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml
Connector version semver check
Connector version increment check
QA checks

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mongodb-internal-poc test

@github-actions
Copy link
Contributor

source-mongodb-internal-poc test report (commit e50a6fcf0a) - ❌

⏲️ Total pipeline duration: 10mn00s

Step Result
Java Connector Unit Tests
Build connector tar
Build source-mongodb-internal-poc docker image for platform linux/x86_64
Java Connector Integration Tests
Acceptance tests
Validate airbyte-integrations/connectors/source-mongodb-internal-poc/metadata.yaml
Connector version semver check
Connector version increment check
QA checks

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-mongodb-internal-poc test

@jdpgrailsdev
Copy link
Contributor Author

Closing in favor of #29763

@jdpgrailsdev jdpgrailsdev deleted the jonathan/correct-cdc-state-type branch August 23, 2023 17:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants