-
Notifications
You must be signed in to change notification settings - Fork 3.6k
[C++] Fix paused zero queue consumer still pre-fetches messages #10036
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
merlimat
merged 5 commits into
apache:master
from
BewareMyPower:bewaremypower/cpp-zero-queue-consumer
Mar 26, 2021
Merged
[C++] Fix paused zero queue consumer still pre-fetches messages #10036
merlimat
merged 5 commits into
apache:master
from
BewareMyPower:bewaremypower/cpp-zero-queue-consumer
Mar 26, 2021
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
There's a C++ compile problem during building pulsar image. It's weird that cpp-tests passed but image build failed. I'll take a look. |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
jiazhai
approved these changes
Mar 25, 2021
merlimat
reviewed
Mar 25, 2021
/pulsarbot run-failure-checks |
2 similar comments
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
merlimat
approved these changes
Mar 26, 2021
codelipenghui
pushed a commit
that referenced
this pull request
Mar 30, 2021
* Fix zero queue consumer pre-fetches messages after paused * Remove unused logs and checks * Fix pulsar image build failure * Fix AtomicHelper functions * Use fetch_add as a substitute of addAndGet (cherry picked from commit fb6b4ea)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
cherry-picked/branch-2.7
Archived: 2.7 is end of life
release/2.7.2
type/bug
The PR fixed a bug or issue reported a bug
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Motivation
In C++ client, zero queue consumer (i.e. the consumer's receiver queue size is 0) will still pre-fetch messages after
pauseMessageListener
is called. It's becauseConsumerImpl::increaseAvailablePermits
doesn't check the boolean variablemessageListenerRunning_
, which becomes false afterpauseMessageListener
is called. Therefore, after the zero queue consumer is paused, it will still send FLOW command to pre-fetch a message to its internal unbounded queueincomingMessages_
.This behavior may cause some messages looks like being lost. For example, for a topic with 10 messages, start a shared consumer to consume 3 messages and pause. Then start another shared consumer to the same subscription. The new consumer will start from the 5th message because the 4th message is cached in the previous consumer.
Modifications
Here're the refactor changes:
messageListenerRunning_
's type tostd::atomic_int
to avoid usage of mutex.receiveMessages
tosendFlowPermitsToBroker
, which makes the method more readable and the checks forClientConnection
avoids lots of debug logs code outside the method invocation.Based on the refactors above, the key changes are:
messageListenerRunning_
inincreaseAvailablePermits
method, and make the implementation consistent with Java client'sConsumerImpl#increaseAvailablePermits
. Also the type ofavailablePermits_
is changed tostd::atomic_int
.pauseMessageListener
doesn't pre-fetch messages any more, add theincreaseAvailablePermits
invocation inresumeMessageListener
to send FLOW command after consumer resumes.Finally the tests are added to verify the
AtomicHelper
's util function and the expected behavior of zero queue consumer.Verifying this change
This change added tests and can be verified as follows:
ZeroQueueSizeTest.testPauseResume
for the case I've described in Motivation section before. Here we usecondition_variable::wait_for
method but notLatch
because the condition may never meet in the previous wrong implementation.