1
1
#include " events_pi.h"
2
2
3
- /*
4
- * The uuid_unparse() function converts the supplied UUID uu from
5
- * the binary representation into a 36-byte string (plus trailing
6
- * '\0') of the form 1b4e28ba-2fa1-11d2-883f-0016d3cca427 and stores
7
- * this value in the character string pointed to by out.
8
- */
9
- #define UUID_STR_SIZE 40
10
-
11
- /*
12
- * Publisher use echo service and subscriber uses cache service
13
- * The eventd process runs this service, which could be down
14
- * All service interactions being async, a timeout is required
15
- * not to block forever on read.
16
- *
17
- * The unit is in milliseconds in sync with ZMQ_RCVTIMEO of
18
- * zmq_setsockopt.
19
- *
20
- * Publisher uses more to shadow async connectivity from PUB to
21
- * XSUB end point of eventd's proxy. Hene have a less value.
22
- *
23
- * Subscriber uses it for cache management and here we need a
24
- * longer timeout, to handle slow proxy. This timeout value's only
25
- * impact could be subscriber process trying to terminate.
26
- */
27
-
28
- #define EVENTS_SERVICE_TIMEOUT_MS_PUB 200
29
- #define EVENTS_SERVICE_TIMEOUT_MS_SUB 2000
30
-
31
3
/*
32
4
* Track created publishers to avoid duplicates
33
- * As we track missed event count by publishing instances, avoiding
5
+ * As receivers track missed event count by publishing instances, avoiding
34
6
* duplicates helps reduce load.
35
7
*/
36
8
@@ -50,27 +22,33 @@ int EventPublisher::init(const string event_source)
50
22
int rc = zmq_connect (sock, get_config (XSUB_END_KEY).c_str ());
51
23
RET_ON_ERR (rc == 0 , " Publisher fails to connect %s" , get_config (XSUB_END_KEY).c_str ());
52
24
53
- // REQ socket is connected and a message is sent & received, more to
54
- // ensure PUB socket had enough time to establish connection.
55
- // Any message published before connection establishment is dropped.
56
- //
57
25
/*
58
26
* Event service could be down. So have a timeout.
59
27
*
60
28
*/
61
29
rc = m_event_service.init_client (m_zmq_ctx, EVENTS_SERVICE_TIMEOUT_MS_PUB);
62
30
RET_ON_ERR (rc == 0 , " Failed to init event service" );
63
31
32
+ /*
33
+ * REQ socket is connected and a message is sent & received, more to
34
+ * ensure PUB socket had enough time to establish connection.
35
+ * Any message published before connection establishment is dropped.
36
+ * NOTE: We don't wait for response here, but read it upon first publish
37
+ * If the publisher init happened early at the start by caller, by the
38
+ * the first event is published, the echo response will be available locally.
39
+ */
64
40
rc = m_event_service.echo_send (" hello" );
65
41
RET_ON_ERR (rc == 0 , " Failed to echo send in event service" );
66
42
67
43
m_event_source = event_source;
68
44
45
+ {
69
46
uuid_t id;
70
47
char uuid_str[UUID_STR_SIZE];
71
48
uuid_generate (id);
72
49
uuid_unparse (id, uuid_str);
73
50
m_runtime_id = string (uuid_str);
51
+ }
74
52
75
53
m_socket = sock;
76
54
out:
@@ -104,9 +82,12 @@ EventPublisher::publish(const string tag, const event_params_t *params)
104
82
* as provided in publisher init.
105
83
*/
106
84
m_event_service.echo_receive (s);
85
+
86
+ /* Close it as we don't need it anymore */
107
87
m_event_service.close_service ();
108
88
}
109
89
90
+ /* Check for timestamp in params. If not, provide it. */
110
91
string param_str;
111
92
event_params_t evt_params;
112
93
if (params != NULL ) {
@@ -120,6 +101,7 @@ EventPublisher::publish(const string tag, const event_params_t *params)
120
101
evt_params[event_ts_param] = get_timestamp ();
121
102
params = &evt_params;
122
103
}
104
+
123
105
rc = serialize (*params, param_str);
124
106
RET_ON_ERR (rc == 0 , " failed to serialize params %s" ,
125
107
map_to_str (*params).c_str ());
@@ -232,9 +214,12 @@ EventSubscriber::~EventSubscriber()
232
214
rc = zmq_message_read (m_socket, ZMQ_DONTWAIT, source, evt_data);
233
215
if (rc == -1 ) {
234
216
if (zerrno == EAGAIN) {
235
- rc = 0 ;
217
+ /* Try again after a small pause */
218
+ this_thread::sleep_for (chrono::milliseconds (10 ));
219
+ }
220
+ else {
221
+ break ;
236
222
}
237
- break ;
238
223
}
239
224
serialize (evt_data, evt_str);
240
225
events.push_back (evt_str);
@@ -315,10 +300,12 @@ EventSubscriber::prune_track()
315
300
{
316
301
map<time_t , vector<runtime_id_t > > lst;
317
302
303
+ /* Sort entries by last touched time */
318
304
for (const auto e: m_track) {
319
305
lst[e.second .epoch_secs ].push_back (e.first );
320
306
}
321
307
308
+ /* By default it walks from lowest value / earliest timestamp */
322
309
map<time_t , vector<runtime_id_t > >::const_iterator itc = lst.begin ();
323
310
for (; (itc != lst.end ()) && (m_track.size () > MAX_PUBLISHERS_COUNT); ++itc) {
324
311
for (const auto r: itc->second ) {
0 commit comments