diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 5e6ab725..ad5bece7 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -193,6 +193,16 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { */ void unassign(); + /** + * \brief Pauses all consumption + */ + void pause(); + + /** + * \brief Resumes all consumption + */ + void resume(); + /** * \brief Commits the current partition assignment * @@ -364,7 +374,6 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { private: static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error, rd_kafka_topic_partition_list_t *partitions, void *opaque); - void close(); void commit(const Message& msg, bool async); void commit(const TopicPartitionList* topic_partitions, bool async); diff --git a/include/cppkafka/topic_partition_list.h b/include/cppkafka/topic_partition_list.h index 06f703c8..d07bb6ee 100644 --- a/include/cppkafka/topic_partition_list.h +++ b/include/cppkafka/topic_partition_list.h @@ -34,6 +34,7 @@ #include #include #include +#include #include #include "macros.h" @@ -54,6 +55,16 @@ CPPKAFKA_API TopicPartitionList convert(const TopicPartitionsListPtr& topic_part CPPKAFKA_API TopicPartitionList convert(rd_kafka_topic_partition_list_t* topic_partitions); CPPKAFKA_API TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle); +// Extracts a partition list subset belonging to the provided topics (case-insensitive) +CPPKAFKA_API TopicPartitionList find_matches(const TopicPartitionList& partitions, + const std::set& topics); + +// Extracts a partition list subset belonging to the provided partition ids +// Note: this assumes that all topic partitions in the original list belong to the same topic +// otherwise the partition ids may not be unique +CPPKAFKA_API TopicPartitionList find_matches(const TopicPartitionList& partitions, + const std::set& ids); + CPPKAFKA_API std::ostream& operator<<(std::ostream& output, const TopicPartitionList& rhs); } // cppkafka diff --git a/src/consumer.cpp b/src/consumer.cpp index 66f1803c..f250cbc4 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -27,6 +27,8 @@ * */ #include +#include +#include #include "consumer.h" #include "exceptions.h" #include "logging.h" @@ -39,6 +41,8 @@ using std::move; using std::make_tuple; using std::ostringstream; using std::chrono::milliseconds; +using std::toupper; +using std::equal; namespace cppkafka { @@ -125,6 +129,14 @@ void Consumer::unassign() { check_error(error); } +void Consumer::pause() { + pause_partitions(get_assignment()); +} + +void Consumer::resume() { + resume_partitions(get_assignment()); +} + void Consumer::commit() { commit(nullptr, false); } diff --git a/src/topic_partition_list.cpp b/src/topic_partition_list.cpp index 26a0c996..029fce9b 100644 --- a/src/topic_partition_list.cpp +++ b/src/topic_partition_list.cpp @@ -28,12 +28,15 @@ */ #include +#include #include "topic_partition_list.h" #include "topic_partition.h" #include "exceptions.h" using std::vector; +using std::set; using std::ostream; +using std::string; namespace cppkafka { @@ -67,6 +70,37 @@ TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle) { return TopicPartitionsListPtr(handle, &rd_kafka_topic_partition_list_destroy); } +TopicPartitionList find_matches(const TopicPartitionList& partitions, + const set& topics) { + TopicPartitionList subset; + for (const auto& partition : partitions) { + for (const auto& topic : topics) { + if (topic.size() == partition.get_topic().size()) { + // compare both strings + bool match = equal(topic.begin(), topic.end(), partition.get_topic().begin(), + [](char c1, char c2)->bool { + return toupper(c1) == toupper(c2); + }); + if (match) { + subset.emplace_back(partition); + } + } + } + } + return subset; +} + +TopicPartitionList find_matches(const TopicPartitionList& partitions, + const set& ids) { + TopicPartitionList subset; + for (const auto& partition : partitions) { + if (ids.count(partition.get_partition()) > 0) { + subset.emplace_back(partition); + } + } + return subset; +} + ostream& operator<<(ostream& output, const TopicPartitionList& rhs) { output << "[ "; for (auto iter = rhs.begin(); iter != rhs.end(); ++iter) { diff --git a/tests/topic_partition_list_test.cpp b/tests/topic_partition_list_test.cpp index f2170d79..1100cb81 100644 --- a/tests/topic_partition_list_test.cpp +++ b/tests/topic_partition_list_test.cpp @@ -4,6 +4,8 @@ #include "cppkafka/topic_partition.h" using std::ostringstream; +using std::set; +using std::string; using namespace cppkafka; @@ -42,3 +44,44 @@ TEST_CASE("topic partition list to string", "[topic_partition]") { output << list; CHECK(output.str() == "[ foo[-1:#], bar[2:#], foobar[3:4] ]"); } + +TEST_CASE("find matches by topic", "[topic_partition]") { + const TopicPartitionList list = { + { "foo", 0 }, + { "bar", 3 }, + { "fb", 1 }, + { "foo", 1 }, + { "fb", 2 }, + { "other", 1 }, + { "a", 1 } + }; + + const TopicPartitionList expected = { + { "foo", 0 }, + { "fb", 1 }, + { "foo", 1 }, + { "fb", 2 }, + }; + const TopicPartitionList subset = find_matches(list, set{"foo", "fb"}); + CHECK(subset == expected); +} + +TEST_CASE("find matches by id", "[topic_partition]") { + const TopicPartitionList list = { + { "foo", 2 }, + { "foo", 3 }, + { "foo", 4 }, + { "foo", 5 }, + { "foo", 6 }, + { "foo", 7 }, + { "foo", 8 } + }; + + const TopicPartitionList expected = { + { "foo", 2 }, + { "foo", 5 }, + { "foo", 8 }, + }; + const TopicPartitionList subset = find_matches(list, set{2,5,8}); + CHECK(subset == expected); +}