Skip to content

[C++] Fix paused zero queue consumer still pre-fetches messages #10036

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

BewareMyPower
Copy link
Contributor

@BewareMyPower BewareMyPower commented Mar 25, 2021

Motivation

In C++ client, zero queue consumer (i.e. the consumer's receiver queue size is 0) will still pre-fetch messages after pauseMessageListener is called. It's because ConsumerImpl::increaseAvailablePermits doesn't check the boolean variable messageListenerRunning_, which becomes false after pauseMessageListener is called. Therefore, after the zero queue consumer is paused, it will still send FLOW command to pre-fetch a message to its internal unbounded queue incomingMessages_.

This behavior may cause some messages looks like being lost. For example, for a topic with 10 messages, start a shared consumer to consume 3 messages and pause. Then start another shared consumer to the same subscription. The new consumer will start from the 5th message because the 4th message is cached in the previous consumer.

Modifications

Here're the refactor changes:

  • Change messageListenerRunning_'s type to std::atomic_int to avoid usage of mutex.
  • Change receiveMessages to sendFlowPermitsToBroker, which makes the method more readable and the checks for ClientConnection avoids lots of debug logs code outside the method invocation.

Based on the refactors above, the key changes are:

  • Add the check for messageListenerRunning_ in increaseAvailablePermits method, and make the implementation consistent with Java client's ConsumerImpl#increaseAvailablePermits. Also the type of availablePermits_ is changed to std::atomic_int.
  • Since pauseMessageListener doesn't pre-fetch messages any more, add the increaseAvailablePermits invocation in resumeMessageListener to send FLOW command after consumer resumes.

Finally the tests are added to verify the AtomicHelper's util function and the expected behavior of zero queue consumer.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • Add ZeroQueueSizeTest.testPauseResume for the case I've described in Motivation section before. Here we use condition_variable::wait_for method but not Latch because the condition may never meet in the previous wrong implementation.

@codelipenghui codelipenghui added the type/bug The PR fixed a bug or issue reported a bug label Mar 25, 2021
@codelipenghui codelipenghui added this to the 2.8.0 milestone Mar 25, 2021
@BewareMyPower
Copy link
Contributor Author

There's a C++ compile problem during building pulsar image. It's weird that cpp-tests passed but image build failed. I'll take a look.

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

2 similar comments
@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor Author

/pulsarbot run-failure-checks

@merlimat merlimat merged commit fb6b4ea into apache:master Mar 26, 2021
@BewareMyPower BewareMyPower deleted the bewaremypower/cpp-zero-queue-consumer branch March 27, 2021 06:31
@codelipenghui codelipenghui added the cherry-picked/branch-2.7 Archived: 2.7 is end of life label Mar 30, 2021
codelipenghui pushed a commit that referenced this pull request Mar 30, 2021
* Fix zero queue consumer pre-fetches messages after paused

* Remove unused logs and checks

* Fix pulsar image build failure

* Fix AtomicHelper functions

* Use fetch_add as a substitute of addAndGet

(cherry picked from commit fb6b4ea)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-picked/branch-2.7 Archived: 2.7 is end of life release/2.7.2 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants