Skip to content

Commit 4df5cab

Browse files
[ResponsePublisher] add pipeline support (sonic-net#2511)
* [ResponsePublisher] add pipeline support Why I did it I did it to improve performance when sending many responses. Responses are buffered in redis client before beeing sent out to redis server, while orchagent has no more pending tasks to do, responses are flushed to redis.
1 parent 44ea6a0 commit 4df5cab

File tree

9 files changed

+117
-24
lines changed

9 files changed

+117
-24
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ swssconfig/swssplayer
7474
tlm_teamd/tlm_teamd
7575
teamsyncd/teamsyncd
7676
tests/tests
77+
tests/mock_tests/tests_response_publisher
7778
tests/mock_tests/tests_fpmsyncd
7879

7980

orchagent/orch.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,11 @@ void Orch::dumpPendingTasks(vector<string> &ts)
562562
}
563563
}
564564

565+
void Orch::flushResponses()
566+
{
567+
m_publisher.flush();
568+
}
569+
565570
void Orch::logfileReopen()
566571
{
567572
gRecordOfs.close();

orchagent/orch.h

+5
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,11 @@ class Orch
223223
static void recordTuple(Consumer &consumer, const swss::KeyOpFieldsValuesTuple &tuple);
224224

225225
void dumpPendingTasks(std::vector<std::string> &ts);
226+
227+
/**
228+
* @brief Flush pending responses
229+
*/
230+
void flushResponses();
226231
protected:
227232
ConsumerMap m_consumerMap;
228233

orchagent/orchdaemon.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,11 @@ void OrchDaemon::flush()
677677
SWSS_LOG_ERROR("Failed to flush redis pipeline %d", status);
678678
handleSaiFailure(true);
679679
}
680+
681+
for (auto* orch: m_orchList)
682+
{
683+
orch->flushResponses();
684+
}
680685
}
681686

682687
/* Release the file handle so the log can be rotated */

orchagent/response_publisher.cpp

+22-15
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,10 @@ void RecordResponse(const std::string &response_channel, const std::string &key,
9090

9191
} // namespace
9292

93-
ResponsePublisher::ResponsePublisher() : m_db("APPL_STATE_DB", 0)
93+
ResponsePublisher::ResponsePublisher(bool buffered) :
94+
m_db(std::make_unique<swss::DBConnector>("APPL_STATE_DB", 0)),
95+
m_pipe(std::make_unique<swss::RedisPipeline>(m_db.get())),
96+
m_buffered(buffered)
9497
{
9598
}
9699

@@ -107,17 +110,14 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key
107110
}
108111

109112
std::string response_channel = "APPL_DB_" + table + "_RESPONSE_CHANNEL";
110-
if (m_notifiers.find(table) == m_notifiers.end())
111-
{
112-
m_notifiers[table] = std::make_unique<swss::NotificationProducer>(&m_db, response_channel);
113-
}
113+
swss::NotificationProducer notificationProducer{m_pipe.get(), response_channel, m_buffered};
114114

115115
auto intent_attrs_copy = intent_attrs;
116116
// Add error message as the first field-value-pair.
117117
swss::FieldValueTuple err_str("err_str", PrependedComponent(status) + status.message());
118118
intent_attrs_copy.insert(intent_attrs_copy.begin(), err_str);
119119
// Sends the response to the notification channel.
120-
m_notifiers[table]->send(status.codeStr(), key, intent_attrs_copy);
120+
notificationProducer.send(status.codeStr(), key, intent_attrs_copy);
121121
RecordResponse(response_channel, key, intent_attrs_copy, status.codeStr());
122122
}
123123

@@ -140,17 +140,14 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key
140140
void ResponsePublisher::writeToDB(const std::string &table, const std::string &key,
141141
const std::vector<swss::FieldValueTuple> &values, const std::string &op, bool replace)
142142
{
143-
if (m_tables.find(table) == m_tables.end())
144-
{
145-
m_tables[table] = std::make_unique<swss::Table>(&m_db, table);
146-
}
143+
swss::Table applStateTable{m_pipe.get(), table, m_buffered};
147144

148145
auto attrs = values;
149146
if (op == SET_COMMAND)
150147
{
151148
if (replace)
152149
{
153-
m_tables[table]->del(key);
150+
applStateTable.del(key);
154151
}
155152
if (!values.size())
156153
{
@@ -160,9 +157,9 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k
160157
// Write to DB only if the key does not exist or non-NULL attributes are
161158
// being written to the entry.
162159
std::vector<swss::FieldValueTuple> fv;
163-
if (!m_tables[table]->get(key, fv))
160+
if (!applStateTable.get(key, fv))
164161
{
165-
m_tables[table]->set(key, attrs);
162+
applStateTable.set(key, attrs);
166163
RecordDBWrite(table, key, attrs, op);
167164
return;
168165
}
@@ -179,13 +176,23 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k
179176
}
180177
if (attrs.size())
181178
{
182-
m_tables[table]->set(key, attrs);
179+
applStateTable.set(key, attrs);
183180
RecordDBWrite(table, key, attrs, op);
184181
}
185182
}
186183
else if (op == DEL_COMMAND)
187184
{
188-
m_tables[table]->del(key);
185+
applStateTable.del(key);
189186
RecordDBWrite(table, key, {}, op);
190187
}
191188
}
189+
190+
void ResponsePublisher::flush()
191+
{
192+
m_pipe->flush();
193+
}
194+
195+
void ResponsePublisher::setBuffered(bool buffered)
196+
{
197+
m_buffered = buffered;
198+
}

orchagent/response_publisher.h

+18-6
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
class ResponsePublisher : public ResponsePublisherInterface
1717
{
1818
public:
19-
explicit ResponsePublisher();
19+
explicit ResponsePublisher(bool buffered = false);
20+
2021
virtual ~ResponsePublisher() = default;
2122

2223
// Intent attributes are the attributes sent in the notification into the
@@ -42,10 +43,21 @@ class ResponsePublisher : public ResponsePublisherInterface
4243
void writeToDB(const std::string &table, const std::string &key, const std::vector<swss::FieldValueTuple> &values,
4344
const std::string &op, bool replace = false) override;
4445

46+
/**
47+
* @brief Flush pending responses
48+
*/
49+
void flush();
50+
51+
/**
52+
* @brief Set buffering mode
53+
*
54+
* @param buffered Flag whether responses are buffered
55+
*/
56+
void setBuffered(bool buffered);
57+
4558
private:
46-
swss::DBConnector m_db;
47-
// Maps table names to tables.
48-
std::unordered_map<std::string, std::unique_ptr<swss::Table>> m_tables;
49-
// Maps table names to notifiers.
50-
std::unordered_map<std::string, std::unique_ptr<swss::NotificationProducer>> m_notifiers;
59+
std::unique_ptr<swss::DBConnector> m_db;
60+
std::unique_ptr<swss::RedisPipeline> m_pipe;
61+
62+
bool m_buffered{false};
5163
};

tests/mock_tests/Makefile.am

+19-2
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ P4_ORCH_DIR = $(top_srcdir)/orchagent/p4orch
44

55
CFLAGS_SAI = -I /usr/include/sai
66

7-
TESTS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd
7+
TESTS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd tests_response_publisher
88

9-
noinst_PROGRAMS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd
9+
noinst_PROGRAMS = tests tests_intfmgrd tests_portsyncd tests_fpmsyncd tests_response_publisher
1010

1111
LDADD_SAI = -lsaimeta -lsaimetadata -lsaivs -lsairedis
1212

@@ -183,3 +183,20 @@ tests_fpmsyncd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST
183183
tests_fpmsyncd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) $(tests_fpmsyncd_INCLUDES)
184184
tests_fpmsyncd_LDADD = $(LDADD_GTEST) $(LDADD_SAI) -lnl-genl-3 -lhiredis -lhiredis \
185185
-lswsscommon -lswsscommon -lgtest -lgtest_main -lzmq -lnl-3 -lnl-route-3 -lpthread -lgmock -lgmock_main
186+
187+
## response publisher unit tests
188+
189+
tests_response_publisher_SOURCES = response_publisher/response_publisher_ut.cpp \
190+
$(top_srcdir)/orchagent/response_publisher.cpp \
191+
mock_orchagent_main.cpp \
192+
mock_dbconnector.cpp \
193+
mock_table.cpp \
194+
mock_hiredis.cpp \
195+
mock_redisreply.cpp
196+
197+
tests_response_publisher_INCLUDES = $(tests_INCLUDES)
198+
tests_response_publisher_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI)
199+
tests_response_publisher_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(CFLAGS_SAI) $(tests_response_publisher_INCLUDES)
200+
tests_response_publisher_LDADD = $(LDADD_GTEST) $(LDADD_SAI) -lnl-genl-3 -lhiredis -lhiredis \
201+
-lswsscommon -lswsscommon -lgtest -lgtest_main -lzmq -lnl-3 -lnl-route-3 -lpthread
202+

tests/mock_tests/fake_response_publisher.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
#include "response_publisher.h"
55

6-
ResponsePublisher::ResponsePublisher() : m_db("APPL_STATE_DB", 0) {}
6+
ResponsePublisher::ResponsePublisher(bool buffered) : m_db(std::make_unique<swss::DBConnector>("APPL_STATE_DB", 0)), m_buffered(buffered) {}
77

88
void ResponsePublisher::publish(
99
const std::string& table, const std::string& key,
@@ -20,3 +20,7 @@ void ResponsePublisher::writeToDB(
2020
const std::string& table, const std::string& key,
2121
const std::vector<swss::FieldValueTuple>& values, const std::string& op,
2222
bool replace) {}
23+
24+
void ResponsePublisher::flush() {}
25+
26+
void ResponsePublisher::setBuffered(bool buffered) {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#include "response_publisher.h"
2+
3+
#include <gtest/gtest.h>
4+
5+
bool gResponsePublisherRecord{false};
6+
bool gResponsePublisherLogRotate{false};
7+
std::ofstream gResponsePublisherRecordOfs;
8+
std::string gResponsePublisherRecordFile;
9+
10+
using namespace swss;
11+
12+
TEST(ResponsePublisher, TestPublish)
13+
{
14+
DBConnector conn{"APPL_STATE_DB", 0};
15+
Table stateTable{&conn, "SOME_TABLE"};
16+
std::string value;
17+
ResponsePublisher publisher{};
18+
19+
publisher.publish("SOME_TABLE", "SOME_KEY", {{"field", "value"}}, ReturnCode(SAI_STATUS_SUCCESS));
20+
ASSERT_TRUE(stateTable.hget("SOME_KEY", "field", value));
21+
ASSERT_EQ(value, "value");
22+
}
23+
24+
TEST(ResponsePublisher, TestPublishBuffered)
25+
{
26+
DBConnector conn{"APPL_STATE_DB", 0};
27+
Table stateTable{&conn, "SOME_TABLE"};
28+
std::string value;
29+
ResponsePublisher publisher{};
30+
31+
publisher.setBuffered(true);
32+
33+
publisher.publish("SOME_TABLE", "SOME_KEY", {{"field", "value"}}, ReturnCode(SAI_STATUS_SUCCESS));
34+
publisher.flush();
35+
ASSERT_TRUE(stateTable.hget("SOME_KEY", "field", value));
36+
ASSERT_EQ(value, "value");
37+
}

0 commit comments

Comments
 (0)