1
1
#include " events_pi.h"
2
2
3
+ /*
4
+ * The uuid_unparse() function converts the supplied UUID uu from
5
+ * the binary representation into a 36-byte string (plus trailing
6
+ * '\0') of the form 1b4e28ba-2fa1-11d2-883f-0016d3cca427 and stores
7
+ * this value in the character string pointed to by out.
8
+ */
9
+ #define UUID_STR_SIZE 40
10
+
3
11
/*
4
12
* Publisher use echo service and subscriber uses cache service
5
13
* The eventd process runs this service, which could be down
18
26
* duplicates helps reduce load.
19
27
*/
20
28
21
- typedef map <string, EventPublisher> lst_publishers_t ;
29
+ typedef map <string, EventPublisher * > lst_publishers_t ;
22
30
lst_publishers_t s_publishers;
23
31
24
32
EventPublisher::EventPublisher () :
@@ -31,29 +39,30 @@ int EventPublisher::init(const string event_source)
31
39
m_zmq_ctx = zmq_ctx_new ();
32
40
m_socket = zmq_socket (m_zmq_ctx, ZMQ_PUB);
33
41
34
- int rc = zmq_connect (m_socket, get_config (XSUB_END_KEY));
35
- RET_ON_ERR (rc == 0 , " Publisher fails to connect %s" , get_config (XSUB_END_KEY));
42
+ int rc = zmq_connect (m_socket, get_config (XSUB_END_KEY). c_str () );
43
+ RET_ON_ERR (rc == 0 , " Publisher fails to connect %s" , get_config (XSUB_END_KEY). c_str () );
36
44
37
45
// REQ socket is connected and a message is sent & received, more to
38
46
// ensure PUB socket had enough time to establish connection.
39
47
// Any message published before connection establishment is dropped.
40
48
//
41
- event_service m_event_svc;
42
49
/*
43
50
* Event service could be down. So have a timeout.
44
51
*
45
52
*/
46
- rc = m_event_svc .init_client (m_zmq_ctx, EVENTS_SERVICE_TIMEOUT_MILLISECS);
53
+ rc = m_event_service .init_client (m_zmq_ctx, EVENTS_SERVICE_TIMEOUT_MILLISECS);
47
54
RET_ON_ERR (rc == 0 , " Failed to init event service" );
48
55
49
- rc = m_event_svc .echo_send (" hello" );
56
+ rc = m_event_service .echo_send (" hello" );
50
57
RET_ON_ERR (rc == 0 , " Failed to echo send in event service" );
51
58
52
- m_event_source ( event_source) ;
59
+ m_event_source = event_source;
53
60
54
61
uuid_t id;
62
+ char uuid_str[UUID_STR_SIZE];
55
63
uuid_generate (id);
56
- uuid_unparse (id, m_runtime_id);
64
+ uuid_unparse (id, uuid_str);
65
+ m_runtime_id = string (uuid_str);
57
66
58
67
m_init = true ;
59
68
out:
@@ -67,9 +76,12 @@ EventPublisher::~EventPublisher()
67
76
}
68
77
69
78
70
- void
71
- EventPublisher::event_publish (const string tag, const event_params_t *params)
79
+ int
80
+ EventPublisher::publish (const string tag, const event_params_t *params)
72
81
{
82
+ int rc;
83
+ internal_event_t event_data;
84
+
73
85
if (m_event_service.is_active ()) {
74
86
string s;
75
87
@@ -78,28 +90,41 @@ EventPublisher::event_publish(const string tag, const event_params_t *params)
78
90
* as provided in publisher init.
79
91
*/
80
92
m_event_service.echo_receive (s);
93
+ m_event_service.close_service ();
81
94
}
82
95
83
96
string param_str;
84
- if ((params != NULL ) && (param->find (event_ts) != params->end ())) {
85
-
86
- param_str = serialize (*params);
97
+ event_params_t evt_params;
98
+ if (params != NULL ) {
99
+ if (params->find (event_ts_param) == params->end ()) {
100
+ evt_params = *params;
101
+ evt_params[event_ts_param] = get_timestamp ();
102
+ params = &evt_params;
103
+ }
87
104
}
88
105
else {
89
- event_params_t evt_params = *params;
90
- evt_params[event_ts] = get_timestamp ();
91
- param_str = serialize (evt_params);
106
+ evt_params[event_ts_param] = get_timestamp ();
107
+ params = &evt_params;
92
108
}
109
+ rc = serialize (*params, param_str);
110
+ RET_ON_ERR (rc == 0 , " failed to serialize params %s" ,
111
+ map_to_str (*params).c_str ());
93
112
94
- map_str_str_t event_str = { { m_event_source + " :" + tag, param_str}};
113
+ {
114
+ map_str_str_t event_str_map = { { m_event_source + " :" + tag, param_str}};
95
115
96
- ++m_sequence;
97
- internal_event_t event_data;
98
- event_data[EVENT_STR_DATA] = serialize (event_str);
116
+ rc = serialize (event_str_map, event_data[EVENT_STR_DATA]);
117
+ RET_ON_ERR (rc == 0 , " failed to serialize event str %s" ,
118
+ map_to_str (event_str_map));
119
+ }
99
120
event_data[EVENT_RUNTIME_ID] = m_runtime_id;
121
+ ++m_sequence;
100
122
event_data[EVENT_SEQUENCE] = seq_to_str (m_sequence);
101
123
102
- return zmq_message_send (m_socket, m_event_source, event_data);
124
+ rc = zmq_message_send (m_socket, m_event_source, event_data);
125
+ RET_ON_ERR (rc == 0 , " failed to send for tag %s" , tag.c_str ());
126
+ out:
127
+ return rc;
103
128
}
104
129
105
130
event_handle_t
@@ -121,7 +146,7 @@ events_init_publisher(const string event_source)
121
146
}
122
147
else {
123
148
ret = p;
124
- s_publishers[key ] = p;
149
+ s_publishers[event_source ] = p;
125
150
}
126
151
}
127
152
return ret;
131
156
events_deinit_publisher (event_handle_t &handle)
132
157
{
133
158
lst_publishers_t ::iterator it;
134
- EventPublisher *pub = NULL ;
135
159
136
160
for (it=s_publishers.begin (); it != s_publishers.end (); ++it) {
137
161
if (it->second == handle) {
@@ -144,12 +168,13 @@ events_deinit_publisher(event_handle_t &handle)
144
168
145
169
}
146
170
147
- void
171
+ int
148
172
event_publish (event_handle_t handle, const string tag, const event_params_t *params)
149
173
{
150
- for (it=s_publishers.begin (); it != s_publishers.end (); ++it) {
151
- if (it->second == handle) {
152
- return it->second ->event_publish (tag, params);
174
+ lst_publishers_t ::const_iterator itc;
175
+ for (itc=s_publishers.begin (); itc != s_publishers.end (); ++itc) {
176
+ if (itc->second == handle) {
177
+ return itc->second ->publish (tag, params);
153
178
}
154
179
}
155
180
return -1 ;
@@ -175,17 +200,17 @@ EventSubscriber::~EventSubscriber()
175
200
*/
176
201
int rc = 0 ;
177
202
178
- if (m_use_cache ) {
203
+ if (m_event_service. is_active () ) {
179
204
events_data_lst_t events;
180
205
181
- rc = m_event_svc .cache_init ();
206
+ rc = m_event_service .cache_init ();
182
207
RET_ON_ERR (rc == 0 , " Failed to init the cache" );
183
208
184
209
/* Shadow the cache init request, as it is async */
185
- m_event_svc .send_recv (EVENT_ECHO);
210
+ m_event_service .send_recv (EVENT_ECHO);
186
211
187
212
/* read for 2 seconds in non-block mode, to drain any local cache */
188
- chrono::steady_clock::timepoint start = chrono::steady_clock::now ();
213
+ chrono::steady_clock::time_point start = chrono::steady_clock::now ();
189
214
while (true ) {
190
215
string source, evt_str;
191
216
rc = zmq_message_read (m_socket, ZMQ_DONTWAIT, source, evt_str);
@@ -196,15 +221,16 @@ EventSubscriber::~EventSubscriber()
196
221
break ;
197
222
}
198
223
events.push_back (evt_str);
199
- chrono::steady_clock::timepoint now = chrono::steady_clock::now ();
200
- if (chrono::duration_cast<std::chrono::milliseconds>(now - start) >
224
+ chrono::steady_clock::time_point now = chrono::steady_clock::now ();
225
+ if (chrono::duration_cast<std::chrono::milliseconds>(now - start). count () >
201
226
CACHE_DRAIN_IN_MILLISECS)
202
227
break ;
203
228
}
204
229
205
230
/* Start cache service with locally read events as initial stock */
206
- RET_ON_ERR (m_event_svc .cache_start (events) == 0 ,
231
+ RET_ON_ERR (m_event_service .cache_start (events) == 0 ,
207
232
" Failed to send cache start" );
233
+ m_event_service.close_service ();
208
234
}
209
235
out:
210
236
zmq_close (m_socket);
@@ -227,29 +253,28 @@ EventSubscriber::init(bool use_cache, const event_subscribe_sources_t *subs_sour
227
253
228
254
m_socket = zmq_socket (m_zmq_ctx, ZMQ_SUB);
229
255
230
- int rc = zmq_connect (m_socket, get_config (XPUB_END_KEY));
231
- RET_ON_ERR (rc == 0 , " Subscriber fails to connect %s" , get_config (XPUB_END_KEY));
256
+ int rc = zmq_connect (m_socket, get_config (XPUB_END_KEY). c_str () );
257
+ RET_ON_ERR (rc == 0 , " Subscriber fails to connect %s" , get_config (XPUB_END_KEY). c_str () );
232
258
233
259
if ((subs_sources == NULL ) || (subs_sources->empty ())) {
234
- rc = zmq_setsockopt (sub_read , ZMQ_SUBSCRIBE, " " , 0 );
260
+ rc = zmq_setsockopt (m_socket , ZMQ_SUBSCRIBE, " " , 0 );
235
261
RET_ON_ERR (rc == 0 , " Fails to set option" );
236
262
}
237
263
else {
238
264
for (const auto e: *subs_sources) {
239
- rc = zmq_setsockopt (sub_read , ZMQ_SUBSCRIBE, e.c_str (), e.size ());
265
+ rc = zmq_setsockopt (m_socket , ZMQ_SUBSCRIBE, e.c_str (), e.size ());
240
266
RET_ON_ERR (rc == 0 , " Fails to set option" );
241
267
}
242
268
}
243
269
244
270
if (use_cache) {
245
- rc = m_event_svc .init_client (m_zmq_ctx, EVENTS_SERVICE_TIMEOUT_MILLISECS);
271
+ rc = m_event_service .init_client (m_zmq_ctx, EVENTS_SERVICE_TIMEOUT_MILLISECS);
246
272
RET_ON_ERR (rc == 0 , " Fails to init the service" );
247
273
248
- if (m_event_svc .cache_stop () == 0 ) {
274
+ if (m_event_service .cache_stop () == 0 ) {
249
275
// Stopped an active cache
250
276
m_cache_read = true ;
251
277
}
252
- m_use_cache = true ;
253
278
}
254
279
m_init = true ;
255
280
out:
@@ -277,25 +302,26 @@ EventSubscriber::prune_track()
277
302
278
303
279
304
int
280
- EventSubscriber::event_receive (string && key, event_params_t ¶ms, int &missed_cnt)
305
+ EventSubscriber::event_receive (string &key, event_params_t ¶ms, int &missed_cnt)
281
306
{
307
+ int rc = 0 ;
282
308
key.clear ();
283
309
event_params_t ().swap (params);
284
310
285
311
while (key.empty ()) {
286
312
internal_event_t event_data;
287
- int rc = 0 ;
288
313
289
314
if (m_cache_read && m_from_cache.empty ()) {
290
- m_event_svc .cache_read (m_from_cache);
315
+ m_event_service .cache_read (m_from_cache);
291
316
m_cache_read = !m_from_cache.empty ();
292
317
}
293
318
294
319
if (!m_from_cache.empty ()) {
295
320
296
321
events_data_lst_t ::iterator it = m_from_cache.begin ();
297
- deserialize (*it, event_data);
322
+ rc = deserialize (*it, event_data);
298
323
m_from_cache.erase (it);
324
+ RET_ON_ERR (rc == 0 , " Failed to deserialize message from cache" );
299
325
300
326
}
301
327
else {
@@ -307,9 +333,9 @@ EventSubscriber::event_receive(string &&key, event_params_t ¶ms, int &missed
307
333
308
334
/* Find any missed events for this runtime ID */
309
335
missed_cnt = 0 ;
336
+ sequence_t seq = events_base::str_to_seq (event_data[EVENT_SEQUENCE]);
310
337
track_info_t ::iterator it = m_track.find (event_data[EVENT_RUNTIME_ID]);
311
338
if (it != m_track.end ()) {
312
- sequence_t seq = events_base::str_to_seq (event_data[EVENT_SEQUENCE]);
313
339
/* current seq - last read - 1 == 0 if none missed */
314
340
missed_cnt = seq - it->second .seq - 1 ;
315
341
it->second = evt_info_t (seq);
@@ -323,15 +349,24 @@ EventSubscriber::event_receive(string &&key, event_params_t ¶ms, int &missed
323
349
if (missed_cnt >= 0 ) {
324
350
map_str_str_t ev;
325
351
326
- deserialize (event_data[EVENT_STR_DATA], ev);
327
- RET_ON_ERR (ev.size () == 1 , " Expect string (%s) deserialize to one key" ,
328
- event_data[EVENT_STR_DATA].c_str ());
352
+ rc = deserialize (event_data[EVENT_STR_DATA], ev);
353
+ RET_ON_ERR (rc == 0 , " Failed to deserialize %s" ,
354
+ event_data[EVENT_STR_DATA].substr (0 , 32 ).c_str ());
355
+
356
+ if (ev.size () != 1 ) rc = -1 ;
357
+ RET_ON_ERR (rc == 0 , " Expect string (%s) deserialize to one key" ,
358
+ event_data[EVENT_STR_DATA].substr (0 , 32 ).c_str ());
329
359
330
360
key = ev.begin ()->first ;
331
361
332
- deserialize (ev.begin ()->second , params);
362
+ rc = deserialize (ev.begin ()->second , params);
363
+ RET_ON_ERR (rc == 0 , " failed to deserialize params %s" ,
364
+ ev.begin ()->second .substr (0 , 32 ).c_str ());
333
365
334
366
}
367
+ else {
368
+ /* negative value implies duplicate; Possibly seen from cache */
369
+ }
335
370
}
336
371
out:
337
372
return rc;
@@ -340,7 +375,7 @@ EventSubscriber::event_receive(string &&key, event_params_t ¶ms, int &missed
340
375
341
376
342
377
/* Expect only one subscriber per process */
343
- EventSubscriber *s_subscriber = NULL ;
378
+ static EventSubscriber *s_subscriber = NULL ;
344
379
345
380
event_handle_t
346
381
events_init_subscriber (bool use_cache=false ,
0 commit comments