1
1
#include " events_pi.h"
2
2
3
+ /*
4
+ * Publisher use echo service and subscriber uses cache service
5
+ * The eventd process runs this service, which could be down
6
+ * All service interactions being async, a timeout is required
7
+ * not to block forever on read.
8
+ *
9
+ * The unit is in milliseconds (in sync with ZMQ_RCVTIMEO of
10
+ * zmq_setsockopt.
11
+ */
12
+
13
+ #define EVENTS_SERVICE_TIMEOUT_MILLISECS 200
14
+
3
15
/*
4
16
* Track created publishers to avoid duplicates
5
17
* As we track missed event count by publishing instances, avoiding
6
18
* duplicates helps reduce load.
7
19
*/
20
+
8
21
typedef map <string, EventPublisher> lst_publishers_t ;
9
22
lst_publishers_t s_publishers;
10
23
11
- EventPublisher::EventPublisher (const string source ) :
24
+ EventPublisher::EventPublisher () :
12
25
m_zmq_ctx(NULL ), m_socket(NULL ), m_sequence(0 ), m_init(false )
13
26
{
14
27
}
15
28
16
- int EventPublisher::init (const string event_source, int block_ms )
29
+ int EventPublisher::init (const string event_source)
17
30
{
18
31
m_zmq_ctx = zmq_ctx_new ();
19
32
m_socket = zmq_socket (m_zmq_ctx, ZMQ_PUB);
@@ -26,7 +39,11 @@ int EventPublisher::init(const string event_source, int block_ms)
26
39
// Any message published before connection establishment is dropped.
27
40
//
28
41
event_service m_event_svc;
29
- rc = m_event_svc.init_client (block_ms);
42
+ /*
43
+ * Event service could be down. So have a timeout.
44
+ *
45
+ */
46
+ rc = m_event_svc.init_client (m_zmq_ctx, EVENTS_SERVICE_TIMEOUT_MILLISECS);
30
47
RET_ON_ERR (rc == 0 , " Failed to init event service" );
31
48
32
49
rc = m_event_svc.echo_send (" hello" );
@@ -57,7 +74,7 @@ EventPublisher::event_publish(const string tag, const event_params_t *params)
57
74
string s;
58
75
59
76
/* Failure is no-op; The eventd service my be down
60
- * NOTE: This call atmost blocks for block_ms milliseconds
77
+ * NOTE: This call atmost blocks for EVENTS_SERVICE_TIMEOUT_MILLISECS
61
78
* as provided in publisher init.
62
79
*/
63
80
m_event_service.echo_receive (s);
@@ -82,11 +99,11 @@ EventPublisher::event_publish(const string tag, const event_params_t *params)
82
99
event_data[EVENT_RUNTIME_ID] = m_runtime_id;
83
100
event_data[EVENT_SEQUENCE] = seq_to_str (m_sequence);
84
101
85
- return zmq_message_send (m_event_source, event_data);
102
+ return zmq_message_send (m_socket, m_event_source, event_data);
86
103
}
87
104
88
105
event_handle_t
89
- events_init_publisher (const string event_source, int block_millisecs )
106
+ events_init_publisher (const string event_source)
90
107
{
91
108
event_handle_t ret = NULL ;
92
109
lst_publishers_t ::iterator it = s_publishers.find (event_source);
@@ -97,7 +114,7 @@ events_init_publisher(const string event_source, int block_millisecs)
97
114
else {
98
115
EventPublisher *p = new EventPublisher ();
99
116
100
- int rc = p->init (event_source, block_millisecs );
117
+ int rc = p->init (event_source);
101
118
102
119
if (rc != 0 ) {
103
120
delete p;
@@ -132,10 +149,10 @@ event_publish(event_handle_t handle, const string tag, const event_params_t *par
132
149
{
133
150
for (it=s_publishers.begin (); it != s_publishers.end (); ++it) {
134
151
if (it->second == handle) {
135
- it->second ->event_publish (tag, params);
136
- break ;
152
+ return it->second ->event_publish (tag, params);
137
153
}
138
154
}
155
+ return -1 ;
139
156
}
140
157
141
158
@@ -179,7 +196,9 @@ EventSubscriber::~EventSubscriber()
179
196
break ;
180
197
}
181
198
events.push_back (evt_str);
182
- if (chrono::steady_clock::now () - start > chrono::seconds (2 ))
199
+ chrono::steady_clock::timepoint now = chrono::steady_clock::now ();
200
+ if (chrono::duration_cast<std::chrono::milliseconds>(now - start) >
201
+ CACHE_DRAIN_IN_MILLISECS)
183
202
break ;
184
203
}
185
204
@@ -223,12 +242,9 @@ EventSubscriber::init(bool use_cache, const event_subscribe_sources_t *subs_sour
223
242
}
224
243
225
244
if (use_cache) {
226
- rc = m_event_svc.init_client (m_zmq_ctx);
245
+ rc = m_event_svc.init_client (m_zmq_ctx, EVENTS_SERVICE_TIMEOUT_MILLISECS );
227
246
RET_ON_ERR (rc == 0 , " Fails to init the service" );
228
247
229
- /* Shadow the cache SUBS connect request, as it is async */
230
- m_event_svc.send_recv (EVENT_ECHO);
231
-
232
248
if (m_event_svc.cache_stop () == 0 ) {
233
249
// Stopped an active cache
234
250
m_cache_read = true ;
@@ -261,10 +277,12 @@ EventSubscriber::prune_track()
261
277
262
278
263
279
int
264
- EventSubscriber::event_receive (event_str_t &event , int &missed_cnt)
280
+ EventSubscriber::event_receive (string &&key, event_params_t ¶ms , int &missed_cnt)
265
281
{
266
- event.clear ();
267
- while (event.empty ()) {
282
+ key.clear ();
283
+ event_params_t ().swap (params);
284
+
285
+ while (key.empty ()) {
268
286
internal_event_t event_data;
269
287
int rc = 0 ;
270
288
@@ -303,7 +321,16 @@ EventSubscriber::event_receive(event_str_t &event, int &missed_cnt)
303
321
}
304
322
}
305
323
if (missed_cnt >= 0 ) {
306
- event = event_data[EVENT_STR_DATA];
324
+ map_str_str_t ev;
325
+
326
+ deserialize (event_data[EVENT_STR_DATA], ev);
327
+ RET_ON_ERR (ev.size () == 1 , " Expect string (%s) deserialize to one key" ,
328
+ event_data[EVENT_STR_DATA].c_str ());
329
+
330
+ key = ev.begin ()->first ;
331
+
332
+ deserialize (ev.begin ()->second , params);
333
+
307
334
}
308
335
}
309
336
out:
@@ -345,10 +372,11 @@ events_deinit_subscriber(event_handle_t &handle)
345
372
346
373
347
374
int
348
- event_receive (event_handle_t handle, event_str_t &event, int &missed_cnt)
375
+ event_receive (event_handle_t handle, string &key,
376
+ event_params_t ¶ms, int &missed_cnt)
349
377
{
350
378
if ((handle == s_subscriber) && (s_subscriber != NULL )) {
351
- return s_subscriber->event_receive (event , missed_cnt);
379
+ return s_subscriber->event_receive (key, params , missed_cnt);
352
380
}
353
381
return -1 ;
354
382
}
0 commit comments