Skip to content

Pause/resume a consumer by topic #67

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 23, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions include/cppkafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,30 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
*/
void unassign();

/**
* \brief Pauses all consumption
*/
void pause();

/**
* \brief Pauses consumption from the given topic list
*
* \param List of topics
*/
void pause_topics(const std::vector<std::string>& topics);

/**
* \brief Resumes all consumption
*/
void resume();

/**
* \brief Resumes consumption from the given topic list
*
* \param List of topics
*/
void resume_topics(const std::vector<std::string>& topics);

/**
* \brief Commits the current partition assignment
*
Expand Down Expand Up @@ -364,6 +388,8 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
private:
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,
rd_kafka_topic_partition_list_t *partitions, void *opaque);
static TopicPartitionList get_matching_partitions(TopicPartitionList&& partitions,
const std::vector<std::string>& topics);

void close();
void commit(const Message& msg, bool async);
Expand Down
37 changes: 37 additions & 0 deletions src/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
*
*/
#include <sstream>
#include <algorithm>
#include <cctype>
#include "consumer.h"
#include "exceptions.h"
#include "logging.h"
Expand All @@ -39,6 +41,8 @@ using std::move;
using std::make_tuple;
using std::ostringstream;
using std::chrono::milliseconds;
using std::toupper;
using std::equal;

namespace cppkafka {

Expand All @@ -48,6 +52,23 @@ void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error,
static_cast<Consumer*>(opaque)->handle_rebalance(error, list);
}

TopicPartitionList Consumer::get_matching_partitions(TopicPartitionList&& partitions,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this an rvalue reference and not a const ref?

Copy link
Contributor Author

@accelerated accelerated May 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I move it internally. I don't need to make a copy. The return of get_assignement is passed-in directly as an rvalue. Unless you think you may need to use this function elsewhere in the class...?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see the move. But anyway, in that case it should be taken by value, not by rvalue ref. The value will be moved into this function so won't be making any copies.

const vector<string>& topics) {
TopicPartitionList matches;
for (const auto& topic : topics) {
for (auto& partition : partitions) {
bool match = equal(topic.begin(), topic.end(), partition.get_topic().begin(),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this blow up if topic.size() > partition.get_topic().size()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah looks like the signature was improved in C++17 with a fourth end iterator. I'll put a boundary check.

[](char c1, char c2)->bool {
return toupper(c1) == toupper(c2);
});
if (match) {
matches.emplace_back(move(partition));
}
}
}
return matches;
}

Consumer::Consumer(Configuration config)
: KafkaHandleBase(move(config)) {
char error_buffer[512];
Expand Down Expand Up @@ -125,6 +146,22 @@ void Consumer::unassign() {
check_error(error);
}

void Consumer::pause() {
pause_partitions(get_assignment());
}

void Consumer::pause_topics(const std::vector<string>& topics) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No std here please.

pause_partitions(get_matching_partitions(get_assignment(), topics));
}

void Consumer::resume() {
resume_partitions(get_assignment());
}

void Consumer::resume_topics(const std::vector<string>& topics) {
resume_partitions(get_matching_partitions(get_assignment(), topics));
}

void Consumer::commit() {
commit(nullptr, false);
}
Expand Down