10
10
#include " json.hpp"
11
11
#include " zmq.h"
12
12
#include < unordered_map>
13
+ #include < boost/serialization/vector.hpp>
14
+ #include < boost/serialization/map.hpp>
15
+ #include < boost/archive/text_iarchive.hpp>
16
+ #include < boost/archive/text_oarchive.hpp>
13
17
14
18
#include " logger.h"
15
19
#include " events.h"
@@ -20,12 +24,18 @@ using namespace chrono;
20
24
extern int errno;
21
25
extern int zerrno;
22
26
27
+ /*
28
+ * Max count of possible concurrent event publishers
29
+ * A rough estimate only, more as a guideline than strict.
30
+ * So this does not limit any usage
31
+ */
32
+ #define MAX_PUBLISHERS_COUNT 1000
23
33
24
34
#define RET_ON_ERR (res, msg, ...)\
25
35
if (!(res)) {\
26
36
int e = errno; \
27
37
zerrno = zmq_errno (); \
28
- string fmt = " errno:%d zmq_errno:%d " + msg; \
38
+ string fmt = string ( " errno:%d zmq_errno:%d " ) + msg; \
29
39
SWSS_LOG_ERROR (fmt.c_str (), e, zerrno, ##__VA_ARGS__); \
30
40
goto out; }
31
41
@@ -64,9 +74,9 @@ T get_config_data(const string key, T def)
64
74
return def;
65
75
}
66
76
else {
67
- stringstream ss (get_config (key));
68
-
69
77
T v;
78
+ stringstream ss (s);
79
+
70
80
ss >> v;
71
81
72
82
return v;
@@ -78,15 +88,56 @@ string get_config(const string key);
78
88
79
89
const string get_timestamp ();
80
90
81
- const string serialize (const map_str_str_t & data);
91
+ /*
92
+ * Way to serialize map or vector
93
+ * boost::archive::text_oarchive could be used to archive any struct/class
94
+ * but that class needs some additional support, that declares
95
+ * boost::serialization::access as private friend and couple more tweaks
96
+ * std::map inherently supports serialization
97
+ */
98
+ template <typename Map>
99
+ const string
100
+ serialize (const Map& data)
101
+ {
102
+ std::stringstream ss;
103
+ boost::archive::text_oarchive oarch (ss);
104
+ oarch << data;
105
+ return ss.str ();
106
+ }
107
+
108
+ template <typename Map>
109
+ void
110
+ deserialize (const string& s, Map& data)
111
+ {
112
+ std::stringstream ss;
113
+ ss << s;
114
+ boost::archive::text_iarchive iarch (ss);
115
+ iarch >> data;
116
+ return ;
117
+ }
82
118
83
- void deserialize (const string& s, map_str_str_t & data);
84
119
85
120
template <typename Map>
86
- int map_to_zmsg (const Map & data, zmq_msg_t &msg);
121
+ int
122
+ map_to_zmsg (const Map& data, zmq_msg_t &msg)
123
+ {
124
+ string s = serialize (data);
125
+
126
+ int rc = zmq_msg_init_size (&msg, s.size ());
127
+ if (rc == 0 ) {
128
+ strncpy ((char *)zmq_msg_data (&msg), s.c_str (), s.size ());
129
+ }
130
+ return rc;
131
+ }
132
+
87
133
88
134
template <typename Map>
89
- void zmsg_to_map (zmq_msg_t &msg, Map &data);
135
+ void
136
+ zmsg_to_map (zmq_msg_t &msg, Map& data)
137
+ {
138
+ string s ((const char *)zmq_msg_data (&msg), zmq_msg_size (&msg));
139
+ deserialize (s, data);
140
+ }
90
141
91
142
92
143
/*
@@ -168,7 +219,7 @@ zmq_message_send(void *sock, p1 pt1, p2 pt2)
168
219
{
169
220
int rc = zmq_send_part (sock, pt2.empty () ? 0 : ZMQ_SNDMORE, pt1);
170
221
171
- if (rc == 0 ) {
222
+ if (( rc == 0 ) && (!pt2. empty ()) ) {
172
223
rc = zmq_send_part (sock, 0 , pt2);
173
224
}
174
225
return rc;
@@ -183,17 +234,14 @@ zmq_message_read(void *sock, int flag, p1 pt1, p2 pt2)
183
234
184
235
rc = zmq_read_part (sock, flag, more, pt1);
185
236
RET_ON_ERR (rc == 0 , " Failed to read part1" );
186
- RET_ON_ERR (more, " Expect 2 parts" );
187
237
188
- rc = zmq_read_part (sock, 0 , more, pt2);
189
- RET_ON_ERR (rc == 0 , " Failed to read part1" );
190
- RET_ON_ERR (!more, " Don't expect more than 2 parts" );
238
+ if (more) {
239
+ rc = zmq_read_part (sock, 0 , more, pt2);
240
+ RET_ON_ERR (rc == 0 , " Failed to read part1" );
241
+ RET_ON_ERR (!more, " Don't expect more than 2 parts" );
242
+ }
191
243
192
244
out:
193
245
return rc;
194
246
}
195
247
196
-
197
-
198
-
199
-
0 commit comments