Skip to content

Pause/resume a consumer by topic #67

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 23, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion include/cppkafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
*/
void unassign();

/**
* \brief Pauses all consumption
*/
void pause();

/**
* \brief Resumes all consumption
*/
void resume();

/**
* \brief Commits the current partition assignment
*
Expand Down Expand Up @@ -364,7 +374,6 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
private:
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,
rd_kafka_topic_partition_list_t *partitions, void *opaque);

void close();
void commit(const Message& msg, bool async);
void commit(const TopicPartitionList* topic_partitions, bool async);
Expand Down
10 changes: 10 additions & 0 deletions include/cppkafka/topic_partition_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ CPPKAFKA_API TopicPartitionList convert(const TopicPartitionsListPtr& topic_part
CPPKAFKA_API TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions);
CPPKAFKA_API TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle);

// Extracts a partition list subset belonging to the provided topics (case-insensitive)
CPPKAFKA_API TopicPartitionList make_subset(const TopicPartitionList& partitions,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is okay here but I'm not sure about the name, "make_subset" doesn't really explain what the function does. Maybe find_matches or something like that? As much as this does return a subset of the input, the important thing is how the subset is created and not the fact that it will return a subset.

const std::vector<std::string>& topics);

// Extracts a partition list subset belonging to the provided partition ids
// Note: this assumes that all topic partitions in the original list belong to the same topic
// otherwise the partition ids may not be unique
CPPKAFKA_API TopicPartitionList make_subset(const TopicPartitionList& partitions,
const std::vector<int>& ids);

CPPKAFKA_API std::ostream& operator<<(std::ostream& output, const TopicPartitionList& rhs);

} // cppkafka
Expand Down
12 changes: 12 additions & 0 deletions src/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
*
*/
#include <sstream>
#include <algorithm>
#include <cctype>
#include "consumer.h"
#include "exceptions.h"
#include "logging.h"
Expand All @@ -39,6 +41,8 @@ using std::move;
using std::make_tuple;
using std::ostringstream;
using std::chrono::milliseconds;
using std::toupper;
using std::equal;

namespace cppkafka {

Expand Down Expand Up @@ -125,6 +129,14 @@ void Consumer::unassign() {
check_error(error);
}

void Consumer::pause() {
pause_partitions(get_assignment());
}

void Consumer::resume() {
resume_partitions(get_assignment());
}

void Consumer::commit() {
commit(nullptr, false);
}
Expand Down
38 changes: 38 additions & 0 deletions src/topic_partition_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
*/

#include <iostream>
#include <string>
#include "topic_partition_list.h"
#include "topic_partition.h"
#include "exceptions.h"

using std::vector;
using std::ostream;
using std::string;

namespace cppkafka {

Expand Down Expand Up @@ -67,6 +69,42 @@ TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle) {
return TopicPartitionsListPtr(handle, &rd_kafka_topic_partition_list_destroy);
}

TopicPartitionList make_subset(const TopicPartitionList& partitions,
const vector<string>& topics) {
vector<bool> skip(partitions.size(), false);
TopicPartitionList subset;
for (const auto& topic : topics) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you flip this loop (first over partitions, then over topics), won't you get rid of the skip vector? Is that there in case there's duplicates in topics or am I missing something? Normally I'd use a set/unordered_set for topics as you want to enforce that there's actually no dupes (plus it makes the lookup much simpler once the loop is flipped). Also, even if this is a simple function, a simple test making sure it does what you want would be nice.

for (size_t i = 0; i < partitions.size(); ++i) {
if (!skip[i] && (topic.size() == partitions[i].get_topic().size())) {
// compare both strings
bool match = equal(topic.begin(), topic.end(), partitions[i].get_topic().begin(),
[](char c1, char c2)->bool {
return toupper(c1) == toupper(c2);
});
if (match) {
skip[i] = true;
subset.emplace_back(partitions[i]);
}
}
}
}
return subset;
}

TopicPartitionList make_subset(const TopicPartitionList& partitions,
const vector<int>& ids) {
TopicPartitionList subset;
for (const auto& id : ids) {
for (const auto& partition : partitions) {
// compare both partition ids
if (id == partition.get_partition()) {
subset.emplace_back(partition);
}
}
}
return subset;
}

ostream& operator<<(ostream& output, const TopicPartitionList& rhs) {
output << "[ ";
for (auto iter = rhs.begin(); iter != rhs.end(); ++iter) {
Expand Down