Skip to content

Ensure that all dangling consumers are cleaned up during failures #6778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 22, 2020

Conversation

srkukarni
Copy link
Contributor

(If this PR fixes a github issue, please add Fixes #<xyz>.)

Fixes #

(or if this PR is one task of a github issue, please add Master Issue: #<xyz> to link to the master issue.)

Master Issue: #

Motivation

In PulsarSource::open, if the sourcespec has multiple topics and if there is an exception creating a consumer for any one of them, the inputConsumers list is not initialized with those consumers who were successful. This means that subsequent close method doesn't cleanup those consumers leaving dangling consumers. This mr fixes that logic

Modifications

Describe the modifications you've done.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)
  • The rest endpoints: (yes / no)
  • The admin cli options: (yes / no)
  • Anything that affects deployment: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@srkukarni srkukarni requested review from merlimat and jerrypeng April 20, 2020 16:00
@srkukarni srkukarni self-assigned this Apr 20, 2020
@@ -80,17 +80,17 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
.messageListener(this);

if (conf.isRegexPattern) {
cb.topicsPattern(topic);
cb = cb.topicsPattern(topic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assignment is not needed, the builder would update in-place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for test, since otherwise the mock would return null

}

return cb.subscribeAsync();
}).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
Consumer<T> consumer = cb.subscribeAsync().join();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be simplified into:

Suggested change
Consumer<T> consumer = cb.subscribeAsync().join();
Consumer<T> consumer = cb.subscribe();

In any case, I think we should be catching any exception thrown over the loop and close down all the consumers that were already created and added to the list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current philosophy is that all cleaning up logic is inside the close which will get called as soon as there is an error. Shouldn't that suffice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client close? What wouldn't work when running in the thread mode, since the PulsarClient is shared.

@sijie sijie added this to the 2.6.0 milestone Apr 21, 2020
@srkukarni srkukarni merged commit 98b818b into apache:master Apr 22, 2020
@srkukarni srkukarni deleted the fix_dangling_consumers branch April 22, 2020 00:21
addisonj pushed a commit to instructure/pulsar that referenced this pull request May 7, 2020
jiazhai pushed a commit that referenced this pull request May 8, 2020
)

Co-authored-by: Sanjeev Kulkarni <[email protected]>(cherry picked from commit 98b818b)
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants