From e6fa8ccc6d2acdc76899430d45dbbee9382bb13c Mon Sep 17 00:00:00 2001 From: Stepan Blyschak Date: Mon, 17 Oct 2022 09:29:14 +0000 Subject: [PATCH 1/5] [notifications] use redis pipeline Signed-off-by: Stepan Blyschak --- common/notificationproducer.cpp | 26 +++++++++++++++-- common/notificationproducer.h | 13 ++++++++- common/schema.h | 2 ++ configure.ac | 2 +- tests/ntf_ut.cpp | 50 +++++++++++++++++++++++++++++++++ 5 files changed, 89 insertions(+), 4 deletions(-) diff --git a/common/notificationproducer.cpp b/common/notificationproducer.cpp index 65587ffca..0e9bd3b71 100644 --- a/common/notificationproducer.cpp +++ b/common/notificationproducer.cpp @@ -1,7 +1,14 @@ #include "notificationproducer.h" +#include + swss::NotificationProducer::NotificationProducer(swss::DBConnector *db, const std::string &channel): - m_db(db), m_channel(channel) + m_ownedpipe(std::make_unique(db, 1)), m_pipe(m_ownedpipe.get()), m_channel(channel), m_buffered(false) +{ +} + +swss::NotificationProducer::NotificationProducer(swss::RedisPipeline *pipeline, const std::string &channel, bool buffered): + m_pipe(pipeline), m_channel(channel), m_buffered(buffered) { } @@ -19,5 +26,20 @@ int64_t swss::NotificationProducer::send(const std::string &op, const std::strin SWSS_LOG_DEBUG("channel %s, publish: %s", m_channel.c_str(), msg.c_str()); - return m_db->publish(m_channel, msg); + RedisCommand command; + command.format("PUBLISH %s %s", m_channel.c_str(), msg.c_str()); + + + if (m_buffered) + { + m_pipe->push(command, REDIS_REPLY_INTEGER); + + // if operating in buffered mode return -1 as we can't know the number + // of client's that have read the message immediately + return -1; + } + + RedisReply reply = m_pipe->push(command); + reply.checkReplyType(REDIS_REPLY_INTEGER); + return reply.getReply(); } diff --git a/common/notificationproducer.h b/common/notificationproducer.h index 216a46d30..96f962cc4 100644 --- a/common/notificationproducer.h +++ b/common/notificationproducer.h @@ -9,6 +9,7 @@ #include "logger.h" #include "table.h" #include "redisreply.h" +#include "redispipeline.h" #include "json.h" namespace swss { @@ -18,6 +19,14 @@ class NotificationProducer public: NotificationProducer(swss::DBConnector *db, const std::string &channel); + /** + * @brief Create NotificationProducer using RedisPipeline + * @param pipeline Pointer to RedisPipeline + * @param channel Channel name + * @param buffered Whether NotificationProducer will work in buffered mode + */ + NotificationProducer(RedisPipeline *pipeline, const std::string &channel, bool buffered = false); + // Returns: the number of clients that received the message int64_t send(const std::string &op, const std::string &data, std::vector &values); @@ -26,8 +35,10 @@ class NotificationProducer NotificationProducer(const NotificationProducer &other); NotificationProducer& operator = (const NotificationProducer &other); - swss::DBConnector *m_db; + std::unique_ptr m_ownedpipe{}; + RedisPipeline *m_pipe; std::string m_channel; + bool m_buffered{false}; }; } diff --git a/common/schema.h b/common/schema.h index 73fb3b94f..c94273bed 100644 --- a/common/schema.h +++ b/common/schema.h @@ -421,6 +421,8 @@ namespace swss { #define CFG_FLOW_COUNTER_ROUTE_PATTERN_TABLE_NAME "FLOW_COUNTER_ROUTE_PATTERN" +#define CFG_APP_STATE_LOGGING "APP_STATE_LOGGING" + /***** STATE DATABASE *****/ #define STATE_SWITCH_CAPABILITY_TABLE_NAME "SWITCH_CAPABILITY_TABLE" diff --git a/configure.ac b/configure.ac index eda00eaeb..37133246e 100644 --- a/configure.ac +++ b/configure.ac @@ -50,7 +50,7 @@ AC_PATH_PROGS(SWIG, [swig4.0 swig3.0 swig]) CFLAGS_COMMON="" CFLAGS_COMMON+=" -ansi" CFLAGS_COMMON+=" -fPIC" -CFLAGS_COMMON+=" -std=c++11" +CFLAGS_COMMON+=" -std=c++14" CFLAGS_COMMON+=" -Wall" CFLAGS_COMMON+=" -Wcast-align" CFLAGS_COMMON+=" -Wcast-qual" diff --git a/tests/ntf_ut.cpp b/tests/ntf_ut.cpp index 59bdac2bc..cd563c500 100644 --- a/tests/ntf_ut.cpp +++ b/tests/ntf_ut.cpp @@ -164,3 +164,53 @@ TEST(Notifications, peek) int rc = nc.peek(); EXPECT_EQ(rc, 0); } + +TEST(Notifications, pipelineProducer) +{ + SWSS_LOG_ENTER(); + + swss::DBConnector dbNtf("ASIC_DB", 0, true); + swss::RedisPipeline pipeline{&dbNtf}; + swss::NotificationConsumer nc(&dbNtf, "NOTIFICATIONS", 100, (size_t)10); + const bool buffered = true; + swss::NotificationProducer notifications(&pipeline, "NOTIFICATIONS", buffered); + + std::vector entry; + for(int i = 0; i < messages; i++) + { + auto s = std::to_string(i+1); + auto sentClients = notifications.send("ntf", s, entry); + // In buffered mode we get -1 in return + EXPECT_EQ(sentClients, -1); + } + + // Flush the pipeline + pipeline.flush(); + + // Pop all the notifications + std::deque vkco; + size_t popped = 0; + size_t npop = 10000; + int collected = 0; + while(nc.peek() > 0 && popped < npop) + { + nc.pops(vkco); + popped += vkco.size(); + + for (auto& kco : vkco) + { + collected++; + auto data = kfvKey(kco); + auto op = kfvOp(kco); + + EXPECT_EQ(op, "ntf"); + int i = stoi(data); + EXPECT_EQ(i, collected); + } + } + EXPECT_EQ(popped, (size_t)messages); + + // Peek and get nothing more + int rc = nc.peek(); + EXPECT_EQ(rc, 0); +} \ No newline at end of file From f00881e4c4f30f0714f6605d24c5aa8ba211933f Mon Sep 17 00:00:00 2001 From: Stepan Blyschak Date: Fri, 21 Oct 2022 08:51:04 +0000 Subject: [PATCH 2/5] remove schema changes Signed-off-by: Stepan Blyschak --- common/schema.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/common/schema.h b/common/schema.h index c94273bed..73fb3b94f 100644 --- a/common/schema.h +++ b/common/schema.h @@ -421,8 +421,6 @@ namespace swss { #define CFG_FLOW_COUNTER_ROUTE_PATTERN_TABLE_NAME "FLOW_COUNTER_ROUTE_PATTERN" -#define CFG_APP_STATE_LOGGING "APP_STATE_LOGGING" - /***** STATE DATABASE *****/ #define STATE_SWITCH_CAPABILITY_TABLE_NAME "SWITCH_CAPABILITY_TABLE" From f4553628ded71b4f8f8fad19d1daa00391fb2f12 Mon Sep 17 00:00:00 2001 From: Stepan Blyschak Date: Fri, 21 Oct 2022 08:54:20 +0000 Subject: [PATCH 3/5] style fixes Signed-off-by: Stepan Blyschak --- common/notificationproducer.cpp | 1 - tests/ntf_ut.cpp | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/notificationproducer.cpp b/common/notificationproducer.cpp index 0e9bd3b71..571555796 100644 --- a/common/notificationproducer.cpp +++ b/common/notificationproducer.cpp @@ -29,7 +29,6 @@ int64_t swss::NotificationProducer::send(const std::string &op, const std::strin RedisCommand command; command.format("PUBLISH %s %s", m_channel.c_str(), msg.c_str()); - if (m_buffered) { m_pipe->push(command, REDIS_REPLY_INTEGER); diff --git a/tests/ntf_ut.cpp b/tests/ntf_ut.cpp index cd563c500..6a4670188 100644 --- a/tests/ntf_ut.cpp +++ b/tests/ntf_ut.cpp @@ -213,4 +213,5 @@ TEST(Notifications, pipelineProducer) // Peek and get nothing more int rc = nc.peek(); EXPECT_EQ(rc, 0); -} \ No newline at end of file +} + From fa1ca24704db8938a31f6e2b2eded8914dbf74a2 Mon Sep 17 00:00:00 2001 From: Stepan Blyschak Date: Thu, 10 Nov 2022 11:21:59 +0000 Subject: [PATCH 4/5] make a define for buffer size Signed-off-by: Stepan Blyschak --- common/notificationproducer.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/notificationproducer.cpp b/common/notificationproducer.cpp index 571555796..1e699ea97 100644 --- a/common/notificationproducer.cpp +++ b/common/notificationproducer.cpp @@ -1,9 +1,9 @@ #include "notificationproducer.h" -#include +#define NON_BUFFERED_COMMAND_BUFFER_SIZE 1 swss::NotificationProducer::NotificationProducer(swss::DBConnector *db, const std::string &channel): - m_ownedpipe(std::make_unique(db, 1)), m_pipe(m_ownedpipe.get()), m_channel(channel), m_buffered(false) + m_ownedpipe(std::make_unique(db, NON_BUFFERED_COMMAND_BUFFER_SIZE)), m_pipe(m_ownedpipe.get()), m_channel(channel), m_buffered(false) { } From 102acb6bb2aa9ccb8c9e3b50922fd901747875fa Mon Sep 17 00:00:00 2001 From: Stepan Blyschak Date: Tue, 29 Nov 2022 16:03:08 +0000 Subject: [PATCH 5/5] use 14 standard in BUILD Signed-off-by: Stepan Blyschak --- BUILD | 1 + 1 file changed, 1 insertion(+) diff --git a/BUILD b/BUILD index 15b443b08..480c0db5a 100644 --- a/BUILD +++ b/BUILD @@ -10,6 +10,7 @@ cc_library( "common/*.hpp", ]), copts = [ + "-std=c++14", "-I/usr/include/libnl3", # Expected location in the SONiC build container" ], includes = [