Skip to content

[C++] Fix paused zero queue consumer still pre-fetches messages #10036

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 21 additions & 36 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
// This is the initial capacity of the queue
incomingMessages_(std::max(config_.getReceiverQueueSize(), 1)),
pendingReceives_(),
availablePermits_(conf.getReceiverQueueSize()),
availablePermits_(0),
receiverQueueRefillThreshold_(config_.getReceiverQueueSize() / 2),
consumerId_(client->newConsumerId()),
consumerName_(config_.getConsumerName()),
partitionIndex_(-1),
Expand Down Expand Up @@ -202,9 +203,12 @@ void ConsumerImpl::connectionFailed(Result result) {
}
}

void ConsumerImpl::receiveMessages(const ClientConnectionPtr& cnx, unsigned int count) {
SharedBuffer cmd = Commands::newFlow(consumerId_, count);
cnx->sendCommand(cmd);
void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int numMessages) {
if (cnx) {
LOG_DEBUG(getName() << "Send more permits: " << numMessages);
SharedBuffer cmd = Commands::newFlow(consumerId_, static_cast<unsigned int>(numMessages));
cnx->sendCommand(cmd);
}
}

void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
Expand All @@ -223,17 +227,17 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
backoff_.reset();
// Complicated logic since we don't have a isLocked() function for mutex
if (waitingForZeroQueueSizeMessage) {
receiveMessages(cnx, 1);
sendFlowPermitsToBroker(cnx, 1);
}
availablePermits_ = 0;
}

LOG_DEBUG(getName() << "Send initial flow permits: " << config_.getReceiverQueueSize());
if (consumerTopicType_ == NonPartitioned || !firstTime) {
if (config_.getReceiverQueueSize() != 0) {
receiveMessages(cnx, config_.getReceiverQueueSize());
sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
} else if (messageListener_) {
receiveMessages(cnx, 1);
sendFlowPermitsToBroker(cnx, 1);
}
}
consumerCreatedPromise_.setValue(shared_from_this());
Expand Down Expand Up @@ -380,11 +384,9 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
}

if (messageListener_) {
Lock lock(messageListenerMutex_);
if (!messageListenerRunning_) {
return;
}
lock.unlock();
// Trigger message listener callback in a separate thread
while (numOfMessageReceived--) {
listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, shared_from_this()));
Expand Down Expand Up @@ -556,11 +558,9 @@ void ConsumerImpl::discardCorruptedMessage(const ClientConnectionPtr& cnx,
}

void ConsumerImpl::internalListener() {
Lock lock(messageListenerMutex_);
if (!messageListenerRunning_) {
return;
}
lock.unlock();
Message msg;
if (!incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
// This will only happen when the connection got reset and we cleared the queue
Expand Down Expand Up @@ -596,10 +596,7 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
waitingForZeroQueueSizeMessage = true;
localLock.unlock();

if (currentCnx) {
LOG_DEBUG(getName() << "Send more permits: " << 1);
receiveMessages(currentCnx, 1);
}
sendFlowPermitsToBroker(currentCnx, 1);

while (true) {
incomingMessages_.pop(msg);
Expand Down Expand Up @@ -646,11 +643,7 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
lock.unlock();

if (config_.getReceiverQueueSize() == 0) {
ClientConnectionPtr currentCnx = getCnx().lock();
if (currentCnx) {
LOG_DEBUG(getName() << "Send more permits: " << 1);
receiveMessages(currentCnx, 1);
}
sendFlowPermitsToBroker(getCnx().lock(), 1);
}
}
}
Expand Down Expand Up @@ -752,20 +745,13 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
}
}

void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int numberOfPermits) {
int additionalPermits = 0;
void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta) {
int newAvailablePermits = availablePermits_.fetch_add(delta) + delta;

availablePermits_ += numberOfPermits;
if (availablePermits_ >= config_.getReceiverQueueSize() / 2) {
additionalPermits = availablePermits_;
availablePermits_ = 0;
}
if (additionalPermits > 0) {
if (currentCnx) {
LOG_DEBUG(getName() << "Send more permits: " << additionalPermits);
receiveMessages(currentCnx, additionalPermits);
} else {
LOG_DEBUG(getName() << "Connection is not ready, Unable to send flow Command");
while (newAvailablePermits >= receiverQueueRefillThreshold_ && messageListenerRunning_) {
if (availablePermits_.compare_exchange_weak(newAvailablePermits, 0)) {
sendFlowPermitsToBroker(currentCnx, newAvailablePermits);
break;
}
}
}
Expand Down Expand Up @@ -972,7 +958,6 @@ Result ConsumerImpl::pauseMessageListener() {
if (!messageListener_) {
return ResultInvalidConfiguration;
}
Lock lock(messageListenerMutex_);
messageListenerRunning_ = false;
return ResultOk;
}
Expand All @@ -982,19 +967,19 @@ Result ConsumerImpl::resumeMessageListener() {
return ResultInvalidConfiguration;
}

Lock lock(messageListenerMutex_);
if (messageListenerRunning_) {
// Not paused
return ResultOk;
}
messageListenerRunning_ = true;
const size_t count = incomingMessages_.size();
lock.unlock();

for (size_t i = 0; i < count; i++) {
// Trigger message listener callback in a separate thread
listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, shared_from_this()));
}
// Check current permits and determine whether to send FLOW command
this->increaseAvailablePermits(getCnx().lock(), 0);
return ResultOk;
}

Expand Down
12 changes: 6 additions & 6 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <lib/stats/ConsumerStatsImpl.h>
#include <lib/stats/ConsumerStatsDisabled.h>
#include <queue>
#include <atomic>

using namespace pulsar;

Expand Down Expand Up @@ -73,11 +74,10 @@ class ConsumerImpl : public ConsumerImplBase,
~ConsumerImpl();
void setPartitionIndex(int partitionIndex);
int getPartitionIndex();
void receiveMessages(const ClientConnectionPtr& cnx, unsigned int count);
void sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int numMessages);
uint64_t getConsumerId();
void messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
bool& isChecksumValid, proto::MessageMetadata& msgMetadata, SharedBuffer& payload);
int incrementAndGetPermits(uint64_t cnxSequenceId);
void messageProcessed(Message& msg);
inline proto::CommandSubscribe_SubType getSubType();
inline proto::CommandSubscribe_InitialPosition getInitialPosition();
Expand Down Expand Up @@ -150,7 +150,7 @@ class ConsumerImpl : public ConsumerImplBase,
const proto::MessageMetadata& metadata, SharedBuffer& payload);
void discardCorruptedMessage(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageId,
proto::CommandAck::ValidationError validationError);
void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int numberOfPermits = 1);
void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta = 1);
void drainIncomingMessageQueue(size_t count);
uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage,
int redeliveryCount);
Expand Down Expand Up @@ -185,14 +185,14 @@ class ConsumerImpl : public ConsumerImplBase,
Optional<MessageId> lastDequedMessage_;
UnboundedBlockingQueue<Message> incomingMessages_;
std::queue<ReceiveCallback> pendingReceives_;
int availablePermits_;
std::atomic_int availablePermits_;
const int receiverQueueRefillThreshold_;
uint64_t consumerId_;
std::string consumerName_;
std::string consumerStr_;
int32_t partitionIndex_;
Promise<Result, ConsumerImplBaseWeakPtr> consumerCreatedPromise_;
bool messageListenerRunning_;
std::mutex messageListenerMutex_;
std::atomic_bool messageListenerRunning_;
CompressionCodecProvider compressionCodecProvider_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
BatchAcknowledgementTracker batchAcknowledgementTracker_;
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ void MultiTopicsConsumerImpl::receiveMessages() {
for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
consumer++) {
ConsumerImplPtr consumerPtr = consumer->second;
consumerPtr->receiveMessages(consumerPtr->getCnx().lock(), conf_.getReceiverQueueSize());
consumerPtr->sendFlowPermitsToBroker(consumerPtr->getCnx().lock(), conf_.getReceiverQueueSize());
LOG_DEBUG("Sending FLOW command for consumer - " << consumerPtr->getConsumerId());
}
}
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ void PartitionedConsumerImpl::internalListener(Consumer consumer) {
void PartitionedConsumerImpl::receiveMessages() {
for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
ConsumerImplPtr consumer = *i;
consumer->receiveMessages(consumer->getCnx().lock(), conf_.getReceiverQueueSize());
consumer->sendFlowPermitsToBroker(consumer->getCnx().lock(), conf_.getReceiverQueueSize());
LOG_DEBUG("Sending FLOW command for consumer - " << consumer->getConsumerId());
}
}
Expand Down
107 changes: 107 additions & 0 deletions pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
#include <pulsar/Client.h>
#include <lib/Latch.h>
#include "ConsumerTest.h"
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>

DECLARE_LOG_OBJECT()

Expand Down Expand Up @@ -116,3 +121,105 @@ TEST(ZeroQueueSizeTest, testMessageListener) {
producer.close();
client.close();
}

static ConsumerConfiguration zeroQueueSharedConsumerConf(
const std::string& name, std::function<void(Consumer, const Message&)> callback) {
ConsumerConfiguration conf;
conf.setConsumerType(ConsumerShared);
conf.setReceiverQueueSize(0);
conf.setSubscriptionInitialPosition(InitialPositionEarliest);
conf.setMessageListener([name, callback](Consumer consumer, const Message& msg) {
LOG_INFO(name << " received " << msg.getDataAsString() << " from " << msg.getMessageId());
callback(consumer, msg);
});
return conf;
}

class IntVector {
public:
size_t add(int i) {
std::lock_guard<std::mutex> lock(mutex_);
data_.emplace_back(i);
return data_.size();
}

std::vector<int> data() const {
std::lock_guard<std::mutex> lock(mutex_);
return data_;
}

private:
std::vector<int> data_;
mutable std::mutex mutex_;
};

TEST(ZeroQueueSizeTest, testPauseResume) {
Client client(lookupUrl);
const auto topic = "ZeroQueueSizeTestPauseListener-" + std::to_string(time(nullptr));
const auto subscription = "my-sub";

auto intToMessage = [](int i) { return MessageBuilder().setContent(std::to_string(i)).build(); };
auto messageToInt = [](const Message& msg) { return std::stoi(msg.getDataAsString()); };

// 1. Produce 10 messages
Producer producer;
const auto producerConf = ProducerConfiguration().setBatchingEnabled(false);
ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
for (int i = 0; i < 10; i++) {
MessageId id;
ASSERT_EQ(ResultOk, producer.send(intToMessage(i), id));
LOG_INFO("Send " << i << " to " << id);
}

// 2. consumer-1 receives 1 message and pause
std::mutex mtx;
std::condition_variable condConsumer1FirstMessage;
std::condition_variable condConsumer1Completed;
IntVector messages1;
const auto conf1 = zeroQueueSharedConsumerConf("consumer-1", [&](Consumer consumer, const Message& msg) {
const auto numReceived = messages1.add(messageToInt(msg));
if (numReceived == 1) {
ASSERT_EQ(ResultOk, consumer.pauseMessageListener());
condConsumer1FirstMessage.notify_all();
} else if (numReceived == 5) {
ASSERT_EQ(ResultOk, consumer.pauseMessageListener());
condConsumer1Completed.notify_all();
}
});
Consumer consumer1;
ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, conf1, consumer1));
{
std::unique_lock<std::mutex> lock(mtx);
ASSERT_EQ(condConsumer1FirstMessage.wait_for(lock, std::chrono::seconds(3)),
std::cv_status::no_timeout);
ASSERT_EQ(messages1.data(), (std::vector<int>{0}));
}

// 3. consumer-2 receives 5 messages and pause
std::condition_variable condConsumer2Completed;
IntVector messages2;
const auto conf2 = zeroQueueSharedConsumerConf("consumer-2", [&](Consumer consumer, const Message& msg) {
const int numReceived = messages2.add(messageToInt(msg));
if (numReceived == 5) {
ASSERT_EQ(ResultOk, consumer.pauseMessageListener());
condConsumer2Completed.notify_all();
}
});
Consumer consumer2;
ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, conf2, consumer2));
{
std::unique_lock<std::mutex> lock(mtx);
ASSERT_EQ(condConsumer2Completed.wait_for(lock, std::chrono::seconds(3)), std::cv_status::no_timeout);
ASSERT_EQ(messages2.data(), (std::vector<int>{1, 2, 3, 4, 5}));
}

// 4. consumer-1 resumes listening, and receives last 4 messages
ASSERT_EQ(ResultOk, consumer1.resumeMessageListener());
{
std::unique_lock<std::mutex> lock(mtx);
ASSERT_EQ(condConsumer1Completed.wait_for(lock, std::chrono::seconds(3)), std::cv_status::no_timeout);
ASSERT_EQ(messages1.data(), (std::vector<int>{0, 6, 7, 8, 9}));
}

client.close();
}