Skip to content

Commit 487d8a8

Browse files
qiluo-msftkktheballer
authored andcommitted
Replace RedisClient with DBConnector (sonic-net#1439)
After this PR: sonic-net/sonic-swss-common#382
1 parent 06229ef commit 487d8a8

File tree

9 files changed

+1000
-24
lines changed

9 files changed

+1000
-24
lines changed

mclagsyncd/mclaglink.cpp

Lines changed: 739 additions & 0 deletions
Large diffs are not rendered by default.

mclagsyncd/mclaglink.h

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/* Copyright(c) 2016-2019 Nephos.
2+
*
3+
* This program is free software; you can redistribute it and/or modify it
4+
* under the terms and conditions of the GNU General Public License,
5+
* version 2, as published by the Free Software Foundation.
6+
*
7+
* This program is distributed in the hope it will be useful, but WITHOUT
8+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
9+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
10+
* more details.
11+
*
12+
* You should have received a copy of the GNU General Public License along with
13+
* this program; if not, see <http://www.gnu.org/licenses/>.
14+
*
15+
* The full GNU General Public License is included in this distribution in
16+
* the file called "COPYING".
17+
*
18+
* Maintainer: Jim Jiang from nephos
19+
*/
20+
21+
#ifndef __MCLAGLINK__
22+
#define __MCLAGLINK__
23+
24+
#include <arpa/inet.h>
25+
#include <sys/socket.h>
26+
#include <linux/netlink.h>
27+
#include <linux/rtnetlink.h>
28+
29+
#include <errno.h>
30+
#include <assert.h>
31+
#include <unistd.h>
32+
#include <exception>
33+
#include <string>
34+
#include <map>
35+
#include <set>
36+
37+
#include "producerstatetable.h"
38+
#include "selectable.h"
39+
#include "redisclient.h"
40+
#include "mclagsyncd/mclag.h"
41+
42+
namespace swss {
43+
44+
#define ETHER_ADDR_STR_LEN 18
45+
#define MAX_L_PORT_NAME 20
46+
47+
struct mclag_fdb_info
48+
{
49+
char mac[ETHER_ADDR_STR_LEN];
50+
unsigned int vid;
51+
char port_name[MAX_L_PORT_NAME];
52+
short type; /*dynamic or static*/
53+
short op_type; /*add or del*/
54+
};
55+
56+
struct mclag_fdb
57+
{
58+
std::string mac;
59+
unsigned int vid;
60+
std::string port_name;
61+
std::string type;/*dynamic or static*/
62+
63+
mclag_fdb(std::string val_mac, unsigned int val_vid, std::string val_pname,
64+
std::string val_type) : mac(val_mac), vid(val_vid), port_name(val_pname), type(val_type)
65+
{
66+
}
67+
mclag_fdb()
68+
{
69+
}
70+
71+
bool operator <(const mclag_fdb &fdb) const
72+
{
73+
if (mac != fdb.mac)
74+
return mac < fdb.mac;
75+
else if (vid != fdb.vid)
76+
return vid < fdb.vid;
77+
else
78+
return port_name < fdb.port_name;
79+
//else if (port_name != fdb.port_name) return port_name < fdb.port_name;
80+
//else return type <fdb.type;
81+
}
82+
83+
bool operator ==(const mclag_fdb &fdb) const
84+
{
85+
if (mac != fdb.mac)
86+
return 0;
87+
if (vid != fdb.vid)
88+
return 0;
89+
return 1;
90+
}
91+
92+
};
93+
94+
class MclagLink : public Selectable {
95+
public:
96+
const int MSG_BATCH_SIZE;
97+
ProducerStateTable * p_port_tbl;
98+
ProducerStateTable * p_lag_tbl;
99+
ProducerStateTable * p_tnl_tbl;
100+
ProducerStateTable * p_intf_tbl;
101+
ProducerStateTable *p_fdb_tbl;
102+
ProducerStateTable *p_acl_table_tbl;
103+
ProducerStateTable *p_acl_rule_tbl;
104+
DBConnector *p_appl_db;
105+
DBConnector *p_asic_db; /*redis client access to ASIC_DB*/
106+
DBConnector *p_counters_db; /*redis client access to COUNTERS_DB*/
107+
std::set <mclag_fdb> *p_old_fdb;
108+
109+
MclagLink(uint16_t port = MCLAG_DEFAULT_PORT);
110+
virtual ~MclagLink();
111+
112+
/* Wait for connection (blocking) */
113+
void accept();
114+
115+
int getFd() override;
116+
uint64_t readData() override;
117+
118+
/* readMe throws MclagConnectionClosedException when connection is lost */
119+
class MclagConnectionClosedException : public std::exception
120+
{
121+
};
122+
123+
private:
124+
unsigned int m_bufSize;
125+
char *m_messageBuffer;
126+
char *m_messageBuffer_send;
127+
unsigned int m_pos;
128+
129+
bool m_connected;
130+
bool m_server_up;
131+
int m_server_socket;
132+
int m_connection_socket;
133+
134+
void getOidToPortNameMap(std::unordered_map<std::string, std:: string> & port_map);
135+
void getBridgePortIdToAttrPortIdMap(std::map<std::string, std:: string> *oid_map);
136+
void getVidByBvid(std::string &bvid, std::string &vlanid);
137+
void getFdbSet(std::set<mclag_fdb> *fdb_set);
138+
void setPortIsolate(char *msg);
139+
void setPortMacLearnMode(char *msg);
140+
void setFdbFlush();
141+
void setFdbFlushByPort(char *msg);
142+
void setIntfMac(char *msg);
143+
void setFdbEntry(char *msg, int msg_len);
144+
ssize_t getFdbChange(char *msg_buf);
145+
void connectionLostHandlePortIsolate();
146+
void connectionLostHandlePortLearnMode();
147+
void connectionLost();
148+
};
149+
150+
}
151+
#endif
152+

mclagsyncd/mclagsyncd.cpp

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/* Copyright(c) 2016-2019 Nephos.
2+
*
3+
* This program is free software; you can redistribute it and/or modify it
4+
* under the terms and conditions of the GNU General Public License,
5+
* version 2, as published by the Free Software Foundation.
6+
*
7+
* This program is distributed in the hope it will be useful, but WITHOUT
8+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
9+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
10+
* more details.
11+
*
12+
* You should have received a copy of the GNU General Public License along with
13+
* this program; if not, see <http://www.gnu.org/licenses/>.
14+
*
15+
* The full GNU General Public License is included in this distribution in
16+
* the file called "COPYING".
17+
*
18+
* Maintainer: Jim Jiang from nephos
19+
*/
20+
#include <iostream>
21+
#include "logger.h"
22+
#include <map>
23+
#include "select.h"
24+
#include "netdispatcher.h"
25+
#include "mclagsyncd/mclaglink.h"
26+
#include <set>
27+
28+
using namespace std;
29+
using namespace swss;
30+
31+
int main(int argc, char **argv)
32+
{
33+
swss::Logger::linkToDbNative("mclagsyncd");
34+
DBConnector appl_db("APPL_DB", 0);
35+
DBConnector asic_db("ASIC_DB", 0);
36+
DBConnector counters_db("COUNTERS_DB", 0);
37+
ProducerStateTable port_tbl(&appl_db, APP_PORT_TABLE_NAME);
38+
ProducerStateTable lag_tbl(&appl_db, APP_LAG_TABLE_NAME);
39+
ProducerStateTable tnl_tbl(&appl_db, APP_VXLAN_TUNNEL_TABLE_NAME);
40+
ProducerStateTable intf_tbl(&appl_db, APP_INTF_TABLE_NAME);
41+
ProducerStateTable fdb_tbl(&appl_db, APP_FDB_TABLE_NAME);
42+
ProducerStateTable acl_table_tbl(&appl_db, APP_ACL_TABLE_TABLE_NAME);
43+
ProducerStateTable acl_rule_tbl(&appl_db, APP_ACL_RULE_TABLE_NAME);
44+
map <string, string> isolate;
45+
RedisPipeline pipeline(&appl_db);
46+
set <mclag_fdb> old_fdb;
47+
48+
while (1)
49+
{
50+
try
51+
{
52+
MclagLink mclag;
53+
Select s;
54+
55+
mclag.p_port_tbl = &port_tbl;
56+
mclag.p_lag_tbl = &lag_tbl;
57+
mclag.p_tnl_tbl = &tnl_tbl;
58+
mclag.p_intf_tbl = &intf_tbl;
59+
mclag.p_fdb_tbl = &fdb_tbl;
60+
mclag.p_acl_table_tbl = &acl_table_tbl;
61+
mclag.p_acl_rule_tbl = &acl_rule_tbl;
62+
mclag.p_appl_db = &appl_db;
63+
mclag.p_asic_db = &asic_db;
64+
mclag.p_counters_db = &counters_db;
65+
mclag.p_old_fdb = &old_fdb;
66+
67+
cout << "Waiting for connection..." << endl;
68+
mclag.accept();
69+
cout << "Connected!" << endl;
70+
71+
s.addSelectable(&mclag);
72+
73+
while (true)
74+
{
75+
Selectable *temps;
76+
77+
/* Reading MCLAG messages forever (and calling "readData" to read them) */
78+
s.select(&temps);
79+
pipeline.flush();
80+
SWSS_LOG_DEBUG("Pipeline flushed");
81+
}
82+
}
83+
catch (MclagLink::MclagConnectionClosedException &e)
84+
{
85+
cout << "Connection lost, reconnecting..." << endl;
86+
}
87+
catch (const exception& e)
88+
{
89+
cout << "Exception \"" << e.what() << "\" had been thrown in deamon" << endl;
90+
return 0;
91+
}
92+
}
93+
94+
return 1;
95+
}
96+

orchagent/bufferorch.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ BufferOrch::BufferOrch(DBConnector *db, vector<string> &tableNames) :
3939
m_flexCounterDb(new DBConnector("FLEX_COUNTER_DB", 0)),
4040
m_flexCounterTable(new ProducerTable(m_flexCounterDb.get(), FLEX_COUNTER_TABLE)),
4141
m_flexCounterGroupTable(new ProducerTable(m_flexCounterDb.get(), FLEX_COUNTER_GROUP_TABLE)),
42-
m_countersDb(new DBConnector("COUNTERS_DB", 0)),
43-
m_countersDbRedisClient(m_countersDb.get())
42+
m_countersDb(new DBConnector("COUNTERS_DB", 0))
4443
{
4544
SWSS_LOG_ENTER();
4645
initTableHandlers();
@@ -361,7 +360,7 @@ task_process_status BufferOrch::processBufferPool(Consumer &consumer)
361360
// Specifically, we push the buffer pool name to oid mapping upon the creation of the oid
362361
// In pg and queue case, this mapping installment is deferred to FlexCounterOrch at a reception of field
363362
// "FLEX_COUNTER_STATUS"
364-
m_countersDbRedisClient.hset(COUNTERS_BUFFER_POOL_NAME_MAP, object_name, sai_serialize_object_id(sai_object));
363+
m_countersDb->hset(COUNTERS_BUFFER_POOL_NAME_MAP, object_name, sai_serialize_object_id(sai_object));
365364
}
366365
}
367366
else if (op == DEL_COMMAND)
@@ -375,7 +374,7 @@ task_process_status BufferOrch::processBufferPool(Consumer &consumer)
375374
SWSS_LOG_NOTICE("Removed buffer pool %s with type %s", object_name.c_str(), map_type_name.c_str());
376375
auto it_to_delete = (m_buffer_type_maps[map_type_name])->find(object_name);
377376
(m_buffer_type_maps[map_type_name])->erase(it_to_delete);
378-
m_countersDbRedisClient.hdel(COUNTERS_BUFFER_POOL_NAME_MAP, object_name);
377+
m_countersDb->hdel(COUNTERS_BUFFER_POOL_NAME_MAP, object_name);
379378
}
380379
else
381380
{

orchagent/bufferorch.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ class BufferOrch : public Orch
6464
unique_ptr<ProducerTable> m_flexCounterTable;
6565

6666
unique_ptr<DBConnector> m_countersDb;
67-
RedisClient m_countersDbRedisClient;
6867

6968
bool m_isBufferPoolWatermarkCounterIdListGenerated = false;
7069
};

orchagent/countercheckorch.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,12 @@ QueueMcCounters CounterCheckOrch::getQueueMcCounters(
193193

194194
vector<FieldValueTuple> fieldValues;
195195
QueueMcCounters counters;
196-
RedisClient redisClient(m_countersDb.get());
197196

198197
for (uint8_t prio = 0; prio < port.m_queue_ids.size(); prio++)
199198
{
200199
sai_object_id_t queueId = port.m_queue_ids[prio];
201200
auto queueIdStr = sai_serialize_object_id(queueId);
202-
auto queueType = redisClient.hget(COUNTERS_QUEUE_TYPE_MAP, queueIdStr);
201+
auto queueType = m_countersDb->hget(COUNTERS_QUEUE_TYPE_MAP, queueIdStr);
203202

204203
if (queueType.get() == nullptr || *queueType != "SAI_QUEUE_TYPE_MULTICAST" || !m_countersTable->get(queueIdStr, fieldValues))
205204
{

orchagent/pfcwdorch.cpp

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -364,9 +364,8 @@ void PfcWdSwOrch<DropHandler, ForwardHandler>::disableBigRedSwitchMode()
364364
}
365365

366366
auto queueId = entry.first;
367-
RedisClient redisClient(this->getCountersDb().get());
368367
string countersKey = this->getCountersTable()->getTableName() + this->getCountersTable()->getTableNameSeparator() + sai_serialize_object_id(queueId);
369-
redisClient.hdel(countersKey, "BIG_RED_SWITCH_MODE");
368+
this->getCountersDb()->hdel(countersKey, "BIG_RED_SWITCH_MODE");
370369
}
371370

372371
m_brsEntryMap.clear();
@@ -642,9 +641,8 @@ void PfcWdSwOrch<DropHandler, ForwardHandler>::unregisterFromWdDb(const Port& po
642641
m_entryMap.erase(queueId);
643642

644643
// Clean up
645-
RedisClient redisClient(this->getCountersDb().get());
646644
string countersKey = this->getCountersTable()->getTableName() + this->getCountersTable()->getTableNameSeparator() + sai_serialize_object_id(queueId);
647-
redisClient.hdel(countersKey, {"PFC_WD_DETECTION_TIME", "PFC_WD_RESTORATION_TIME", "PFC_WD_ACTION", "PFC_WD_STATUS"});
645+
this->getCountersDb()->hdel(countersKey, {"PFC_WD_DETECTION_TIME", "PFC_WD_RESTORATION_TIME", "PFC_WD_ACTION", "PFC_WD_STATUS"});
648646
}
649647

650648
}
@@ -666,8 +664,7 @@ PfcWdSwOrch<DropHandler, ForwardHandler>::PfcWdSwOrch(
666664
c_queueAttrIds(queueAttrIds),
667665
m_pollInterval(pollInterval),
668666
m_applDb(make_shared<DBConnector>("APPL_DB", 0)),
669-
m_applTable(make_shared<Table>(m_applDb.get(), APP_PFC_WD_TABLE_NAME "_INSTORM")),
670-
m_applDbRedisClient(m_applDb.get())
667+
m_applTable(make_shared<Table>(m_applDb.get(), APP_PFC_WD_TABLE_NAME "_INSTORM"))
671668
{
672669
SWSS_LOG_ENTER();
673670

@@ -943,7 +940,7 @@ bool PfcWdSwOrch<DropHandler, ForwardHandler>::startWdActionOnQueue(const string
943940
entry->second.handler->initCounters();
944941
// Log storm event to APPL_DB for warm-reboot purpose
945942
string key = m_applTable->getTableName() + m_applTable->getTableNameSeparator() + entry->second.portAlias;
946-
m_applDbRedisClient.hset(key, to_string(entry->second.index), PFC_WD_IN_STORM);
943+
m_applDb->hset(key, to_string(entry->second.index), PFC_WD_IN_STORM);
947944
}
948945
}
949946
else if (entry->second.action == PfcWdAction::PFC_WD_ACTION_DROP)
@@ -965,7 +962,7 @@ bool PfcWdSwOrch<DropHandler, ForwardHandler>::startWdActionOnQueue(const string
965962
entry->second.handler->initCounters();
966963
// Log storm event to APPL_DB for warm-reboot purpose
967964
string key = m_applTable->getTableName() + m_applTable->getTableNameSeparator() + entry->second.portAlias;
968-
m_applDbRedisClient.hset(key, to_string(entry->second.index), PFC_WD_IN_STORM);
965+
m_applDb->hset(key, to_string(entry->second.index), PFC_WD_IN_STORM);
969966
}
970967
}
971968
else if (entry->second.action == PfcWdAction::PFC_WD_ACTION_FORWARD)
@@ -987,7 +984,7 @@ bool PfcWdSwOrch<DropHandler, ForwardHandler>::startWdActionOnQueue(const string
987984
entry->second.handler->initCounters();
988985
// Log storm event to APPL_DB for warm-reboot purpose
989986
string key = m_applTable->getTableName() + m_applTable->getTableNameSeparator() + entry->second.portAlias;
990-
m_applDbRedisClient.hset(key, to_string(entry->second.index), PFC_WD_IN_STORM);
987+
m_applDb->hset(key, to_string(entry->second.index), PFC_WD_IN_STORM);
991988
}
992989
}
993990
else
@@ -1011,7 +1008,7 @@ bool PfcWdSwOrch<DropHandler, ForwardHandler>::startWdActionOnQueue(const string
10111008
entry->second.handler = nullptr;
10121009
// Remove storm status in APPL_DB for warm-reboot purpose
10131010
string key = m_applTable->getTableName() + m_applTable->getTableNameSeparator() + entry->second.portAlias;
1014-
m_applDbRedisClient.hdel(key, to_string(entry->second.index));
1011+
m_applDb->hdel(key, to_string(entry->second.index));
10151012
}
10161013
}
10171014
else
@@ -1028,8 +1025,6 @@ bool PfcWdSwOrch<DropHandler, ForwardHandler>::bake()
10281025
{
10291026
// clean all *_last and *_LEFT fields in COUNTERS_TABLE
10301027
// to allow warm-reboot pfc detect & restore state machine to enter the same init state as cold-reboot
1031-
RedisClient redisClient(this->getCountersDb().get());
1032-
10331028
vector<string> cKeys;
10341029
this->getCountersTable()->getKeys(cKeys);
10351030
for (const auto &key : cKeys)
@@ -1046,7 +1041,7 @@ bool PfcWdSwOrch<DropHandler, ForwardHandler>::bake()
10461041
}
10471042
if (!wLasts.empty())
10481043
{
1049-
redisClient.hdel(
1044+
this->getCountersDb()->hdel(
10501045
this->getCountersTable()->getTableName()
10511046
+ this->getCountersTable()->getTableNameSeparator()
10521047
+ key,

orchagent/pfcwdorch.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,6 @@ class PfcWdSwOrch: public PfcWdOrch<DropHandler, ForwardHandler>
135135
shared_ptr<DBConnector> m_applDb = nullptr;
136136
// Track queues in storm
137137
shared_ptr<Table> m_applTable = nullptr;
138-
// used for hset and hdel
139-
RedisClient m_applDbRedisClient;
140138
};
141139

142140
#endif

0 commit comments

Comments
 (0)