Skip to content

Commit 1d84b90

Browse files
authored
Add ZeroMQ communication channel between sairedis and syncd (#659)
* [sairedis] Use separate db connector for VID index generator * [sairedis] Add Channel class * [sairedis] Start using Channel class in RedisChannel * [sairedis] Add zmq configuration to ContextConfig * [sairedis] Change reddis channel class to base channel class * [syncd] Fix aspell * [sairedis] Disable zeromq by default * [sairedis] Add ZeroMQChannel class * Update aspell * [syncd] Add NotificationProducerBase class * [syncd] Add RedisNotificationProducer class * [syncd] Start using RediisNotificationProducer class * [syncd] Add ZeroMQNotificationProducer class * [syncd] Start uisng ZeroMQNotificationProducer class * [sairedis] Start using ZeroMQChannel clas * [saiplayer] Use lib zmq in saiplayer Makefile * [tests] Add libzmq to makefile * [sairedis] Force sync mode when zmq enabled * [syncd] Force sync mode when zmq enabled * [syncd] Add libzmq to makefile * [syncd] Add SelectableChannel class * [syncd] Add RedisSelectableChannel class * [syncd] Start using SelectableChannel base * [saidump] Add libzmq to makefile * [syncd] Add ZeroMQSelectableChannel class * [syncd] Start using ZeroMQSelectableChannel * [sairedis] Remove unused includes from Channel class * [sairedis] Fix ZeroMQChannel error checks * [syncd] Use zmq_poll in separate thread to ignore edge trigger poll * [sairedis] Fix aspell * [tests] Update aspell dict * [tests] Add zmq channel unittests * [syncd] Fix ZeroMQNotificationProducer connect error code * Update aspell * [syncd] Increase ZeroMQSelectableChannel timeout to 2min * Update aspell * Modify libzmq order in Makefiles * [syncd] Fix merge issues
1 parent 017056a commit 1d84b90

39 files changed

+1495
-169
lines changed

lib/inc/Channel.h

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#pragma once
2+
3+
#include "swss/producertable.h"
4+
#include "swss/consumertable.h"
5+
#include "swss/notificationconsumer.h"
6+
#include "swss/selectableevent.h"
7+
#include "swss/sal.h"
8+
9+
extern "C" {
10+
#include "sai.h"
11+
}
12+
13+
#include <memory>
14+
#include <functional>
15+
16+
namespace sairedis
17+
{
18+
class Channel
19+
{
20+
public:
21+
22+
typedef std::function<void(const std::string&,const std::string&, const std::vector<swss::FieldValueTuple>&)> Callback;
23+
24+
public:
25+
26+
Channel(
27+
_In_ Callback callback);
28+
29+
virtual ~Channel();
30+
31+
public:
32+
33+
virtual void setBuffered(
34+
_In_ bool buffered) = 0;
35+
36+
virtual void flush() = 0;
37+
38+
virtual void set(
39+
_In_ const std::string& key,
40+
_In_ const std::vector<swss::FieldValueTuple>& values,
41+
_In_ const std::string& command) = 0;
42+
43+
virtual void del(
44+
_In_ const std::string& key,
45+
_In_ const std::string& command) = 0;
46+
47+
virtual sai_status_t wait(
48+
_In_ const std::string& command,
49+
_Out_ swss::KeyOpFieldsValuesTuple& kco) = 0;
50+
51+
protected:
52+
53+
virtual void notificationThreadFunction() = 0;
54+
55+
protected:
56+
57+
Callback m_callback;
58+
59+
protected: // notification
60+
61+
/**
62+
* @brief Indicates whether notification thread should be running.
63+
*/
64+
volatile bool m_runNotificationThread;
65+
66+
/**
67+
* @brief Event used to nice end notifications thread.
68+
*/
69+
swss::SelectableEvent m_notificationThreadShouldEndEvent;
70+
71+
/**
72+
* @brief Notification thread
73+
*/
74+
std::shared_ptr<std::thread> m_notificationThread;
75+
};
76+
}

lib/inc/ContextConfig.h

+6
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ namespace sairedis
3838

3939
std::string m_dbState;
4040

41+
bool m_zmqEnable;
42+
43+
std::string m_zmqEndpoint;
44+
45+
std::string m_zmqNtfEndpoint;
46+
4147
std::shared_ptr<SwitchConfigContainer> m_scc;
4248
};
4349
}

lib/inc/RedisChannel.h

+15-36
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include "Channel.h"
34
#include "RemoteSaiInterface.h"
45
#include "SwitchContainer.h"
56
#include "VirtualObjectIdManager.h"
@@ -17,49 +18,42 @@
1718

1819
namespace sairedis
1920
{
20-
class RedisChannel
21+
class RedisChannel:
22+
public Channel
2123
{
22-
public:
23-
24-
typedef std::function<void(const std::string&,const std::string&, const std::vector<swss::FieldValueTuple>&)> Callback;
25-
2624
public:
2725

2826
RedisChannel(
2927
_In_ const std::string& dbAsic,
30-
_In_ Callback callback);
28+
_In_ Channel::Callback callback);
3129

3230
virtual ~RedisChannel();
3331

3432
public:
3533

3634
std::shared_ptr<swss::DBConnector> getDbConnector() const;
3735

38-
void setBuffered(
39-
_In_ bool buffered);
36+
virtual void setBuffered(
37+
_In_ bool buffered) override;
4038

41-
void flush();
39+
virtual void flush() override;
4240

43-
void set(
41+
virtual void set(
4442
_In_ const std::string& key,
4543
_In_ const std::vector<swss::FieldValueTuple>& values,
46-
_In_ const std::string& command);
44+
_In_ const std::string& command) override;
4745

48-
void del(
46+
virtual void del(
4947
_In_ const std::string& key,
50-
_In_ const std::string& command);
48+
_In_ const std::string& command) override;
5149

52-
sai_status_t wait(
50+
virtual sai_status_t wait(
5351
_In_ const std::string& command,
54-
_Out_ swss::KeyOpFieldsValuesTuple& kco);
55-
56-
private:
57-
58-
void notificationThreadFunction();
52+
_Out_ swss::KeyOpFieldsValuesTuple& kco) override;
5953

60-
private:
54+
protected:
6155

62-
Callback m_callback;
56+
virtual void notificationThreadFunction() override;
6357

6458
private:
6559

@@ -85,11 +79,6 @@ namespace sairedis
8579

8680
private: // notification
8781

88-
/**
89-
* @brief Indicates whether notification thread should be running.
90-
*/
91-
volatile bool m_runNotificationThread;
92-
9382
/**
9483
* @brief Database connector used for notifications.
9584
*/
@@ -99,15 +88,5 @@ namespace sairedis
9988
* @brief Notification consumer.
10089
*/
10190
std::shared_ptr<swss::NotificationConsumer> m_notificationConsumer;
102-
103-
/**
104-
* @brief Event used to nice end notifications thread.
105-
*/
106-
swss::SelectableEvent m_notificationThreadShouldEndEvent;
107-
108-
/**
109-
* @brief Notification thread
110-
*/
111-
std::shared_ptr<std::thread> m_notificationThread;
11291
};
11392
}

lib/inc/RedisRemoteSaiInterface.h

+6-9
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "SkipRecordAttrContainer.h"
1010
#include "RedisChannel.h"
1111
#include "SwitchConfigContainer.h"
12+
#include "ContextConfig.h"
1213

1314
#include "swss/producertable.h"
1415
#include "swss/consumertable.h"
@@ -72,9 +73,7 @@ namespace sairedis
7273
public:
7374

7475
RedisRemoteSaiInterface(
75-
_In_ uint32_t globalContext,
76-
_In_ std::shared_ptr<SwitchConfigContainer> scc,
77-
_In_ const std::string& dbAsic,
76+
_In_ std::shared_ptr<ContextConfig> contextConfig,
7877
_In_ std::function<sai_switch_notifications_t(std::shared_ptr<Notification>)> notificationCallback,
7978
_In_ std::shared_ptr<Recorder> recorder);
8079

@@ -438,9 +437,7 @@ namespace sairedis
438437

439438
private:
440439

441-
uint32_t m_globalContext;
442-
443-
std::shared_ptr<SwitchConfigContainer> m_switchConfigContainer;
440+
std::shared_ptr<ContextConfig> m_contextConfig;
444441

445442
bool m_asicInitViewMode;
446443

@@ -456,18 +453,18 @@ namespace sairedis
456453

457454
std::shared_ptr<VirtualObjectIdManager> m_virtualObjectIdManager;
458455

456+
std::shared_ptr<swss::DBConnector> m_db;
457+
459458
std::shared_ptr<RedisVidIndexGenerator> m_redisVidIndexGenerator;
460459

461460
std::weak_ptr<saimeta::Meta> m_meta;
462461

463462
std::shared_ptr<SkipRecordAttrContainer> m_skipRecordAttrContainer;
464463

465-
std::shared_ptr<RedisChannel> m_redisChannel;
464+
std::shared_ptr<Channel> m_communicationChannel;
466465

467466
std::function<sai_switch_notifications_t(std::shared_ptr<Notification>)> m_notificationCallback;
468467

469-
std::string m_dbAsic;
470-
471468
std::map<sai_object_id_t, swss::TableDump> m_tableDump;
472469
};
473470
}

lib/inc/ZeroMQChannel.h

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#pragma once
2+
3+
#include "Channel.h"
4+
5+
#include "swss/producertable.h"
6+
#include "swss/consumertable.h"
7+
#include "swss/notificationconsumer.h"
8+
#include "swss/selectableevent.h"
9+
10+
#include <memory>
11+
#include <functional>
12+
13+
namespace sairedis
14+
{
15+
class ZeroMQChannel:
16+
public Channel
17+
{
18+
public:
19+
20+
ZeroMQChannel(
21+
_In_ const std::string& endpoint,
22+
_In_ const std::string& ntfEndpoint,
23+
_In_ Channel::Callback callback);
24+
25+
virtual ~ZeroMQChannel();
26+
27+
public:
28+
29+
virtual void setBuffered(
30+
_In_ bool buffered) override;
31+
32+
virtual void flush() override;
33+
34+
virtual void set(
35+
_In_ const std::string& key,
36+
_In_ const std::vector<swss::FieldValueTuple>& values,
37+
_In_ const std::string& command) override;
38+
39+
virtual void del(
40+
_In_ const std::string& key,
41+
_In_ const std::string& command) override;
42+
43+
virtual sai_status_t wait(
44+
_In_ const std::string& command,
45+
_Out_ swss::KeyOpFieldsValuesTuple& kco) override;
46+
47+
protected:
48+
49+
virtual void notificationThreadFunction() override;
50+
51+
private:
52+
53+
std::string m_endpoint;
54+
55+
std::string m_ntfEndpoint;
56+
57+
std::vector<uint8_t> m_buffer;
58+
59+
void* m_context;
60+
61+
void* m_socket;
62+
63+
void* m_ntfContext;
64+
65+
void* m_ntfSocket;
66+
67+
};
68+
}

lib/src/Channel.cpp

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#include "Channel.h"
2+
3+
#include "swss/logger.h"
4+
5+
using namespace sairedis;
6+
7+
Channel::Channel(
8+
_In_ Callback callback):
9+
m_callback(callback)
10+
{
11+
SWSS_LOG_ENTER();
12+
13+
// empty
14+
}
15+
16+
Channel::~Channel()
17+
{
18+
SWSS_LOG_ENTER();
19+
20+
// empty
21+
}

lib/src/Context.cpp

+1-3
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ Context::Context(
1717

1818
// will create notification thread
1919
m_redisSai = std::make_shared<RedisRemoteSaiInterface>(
20-
m_contextConfig->m_guid,
21-
m_contextConfig->m_scc,
22-
m_contextConfig->m_dbAsic,
20+
m_contextConfig,
2321
std::bind(&Context::handle_notification, this, _1),
2422
m_recorder);
2523

lib/src/ContextConfig.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ ContextConfig::ContextConfig(
1616
m_dbAsic(dbAsic),
1717
m_dbCounters(dbCounters),
1818
m_dbFlex(dbFlex),
19-
m_dbState(dbState)
19+
m_dbState(dbState),
20+
m_zmqEnable(false)
2021
{
2122
SWSS_LOG_ENTER();
2223

lib/src/ContextConfigContainer.cpp

+11-2
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ std::shared_ptr<ContextConfigContainer> ContextConfigContainer::getDefault()
3535
auto sc = std::make_shared<SwitchConfig>(0, "");
3636

3737
cc->insert(sc);
38-
38+
3939
ccc->m_map[0] = cc;
40-
40+
4141
return ccc;
4242
}
4343

@@ -113,6 +113,15 @@ std::shared_ptr<ContextConfigContainer> ContextConfigContainer::loadFromFile(
113113

114114
auto cc = std::make_shared<ContextConfig>(guid, name, dbAsic, dbCounters, dbFlex, dbState);
115115

116+
cc->m_zmqEnable = item["zmq_enable"];
117+
cc->m_zmqEndpoint = item["zmq_endpoint"];
118+
cc->m_zmqNtfEndpoint = item["zmq_ntf_endpoint"];
119+
120+
SWSS_LOG_NOTICE("contextConfig zmq enable %s, endpoint: %s, ntf endpoint: %s",
121+
(cc->m_zmqEnable) ? "true" : "false",
122+
cc->m_zmqEndpoint.c_str(),
123+
cc->m_zmqNtfEndpoint.c_str());
124+
116125
for (size_t k = 0; k < item["switches"].size(); k++)
117126
{
118127
json& sw = item["switches"][k];

lib/src/Makefile.am

+3-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ lib_LTLIBRARIES = libsairedis.la
1010

1111
noinst_LIBRARIES = libSaiRedis.a
1212
libSaiRedis_a_SOURCES = \
13+
ZeroMQChannel.cpp \
14+
Channel.cpp \
1315
Context.cpp \
1416
ContextConfigContainer.cpp \
1517
ContextConfig.cpp \
@@ -92,7 +94,7 @@ bin_PROGRAMS = tests
9294

9395
tests_SOURCES = tests.cpp
9496
tests_CPPFLAGS = $(DBGFLAGS) $(AM_CPPFLAGS) $(CFLAGS_COMMON)
95-
tests_LDADD = -lhiredis -lswsscommon -lpthread $(top_srcdir)/meta/libsaimetadata.la $(top_srcdir)/meta/libsaimeta.la libsairedis.la
97+
tests_LDADD = -lhiredis -lswsscommon -lpthread $(top_srcdir)/meta/libsaimetadata.la $(top_srcdir)/meta/libsaimeta.la libsairedis.la -lzmq
9698

9799
TESTS = tests
98100

0 commit comments

Comments
 (0)