-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Ensure that all dangling consumers are cleaned up during failures #6778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -80,17 +80,17 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws | |||
.messageListener(this); | |||
|
|||
if (conf.isRegexPattern) { | |||
cb.topicsPattern(topic); | |||
cb = cb.topicsPattern(topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assignment is not needed, the builder would update in-place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed for test, since otherwise the mock would return null
} | ||
|
||
return cb.subscribeAsync(); | ||
}).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList()); | ||
Consumer<T> consumer = cb.subscribeAsync().join(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be simplified into:
Consumer<T> consumer = cb.subscribeAsync().join(); | |
Consumer<T> consumer = cb.subscribe(); |
In any case, I think we should be catching any exception thrown over the loop and close down all the consumers that were already created and added to the list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current philosophy is that all cleaning up logic is inside the close which will get called as soon as there is an error. Shouldn't that suffice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client close? What wouldn't work when running in the thread mode, since the PulsarClient
is shared.
…ache#6778) Co-authored-by: Sanjeev Kulkarni <[email protected]>
) Co-authored-by: Sanjeev Kulkarni <[email protected]>(cherry picked from commit 98b818b)
…ache#6778) Co-authored-by: Sanjeev Kulkarni <[email protected]>
(If this PR fixes a github issue, please add
Fixes #<xyz>
.)Fixes #
(or if this PR is one task of a github issue, please add
Master Issue: #<xyz>
to link to the master issue.)Master Issue: #
Motivation
In PulsarSource::open, if the sourcespec has multiple topics and if there is an exception creating a consumer for any one of them, the inputConsumers list is not initialized with those consumers who were successful. This means that subsequent close method doesn't cleanup those consumers leaving dangling consumers. This mr fixes that logic
Modifications
Describe the modifications you've done.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation