Skip to content

Commit 9d64634

Browse files
events 75% covered by UT
1 parent 67fbe19 commit 9d64634

File tree

6 files changed

+302
-15
lines changed

6 files changed

+302
-15
lines changed

common/events.cpp

+26-7
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,17 @@
1616
*
1717
* The unit is in milliseconds in sync with ZMQ_RCVTIMEO of
1818
* zmq_setsockopt.
19+
*
20+
* Publisher uses more to shadow async connectivity from PUB to
21+
* XSUB end point of eventd's proxy. Hene have a less value.
22+
*
23+
* Subscriber uses it for cache management and here we need a
24+
* longer timeout, to handle slow proxy. This timeout value's only
25+
* impact could be subscriber process trying to terminate.
1926
*/
2027

21-
#define EVENTS_SERVICE_TIMEOUT_MILLISECS 200
28+
#define EVENTS_SERVICE_TIMEOUT_MS_PUB 200
29+
#define EVENTS_SERVICE_TIMEOUT_MS_SUB 2000
2230

2331
/*
2432
* Track created publishers to avoid duplicates
@@ -50,7 +58,7 @@ int EventPublisher::init(const string event_source)
5058
* Event service could be down. So have a timeout.
5159
*
5260
*/
53-
rc = m_event_service.init_client(m_zmq_ctx, EVENTS_SERVICE_TIMEOUT_MILLISECS);
61+
rc = m_event_service.init_client(m_zmq_ctx, EVENTS_SERVICE_TIMEOUT_MS_PUB);
5462
RET_ON_ERR (rc == 0, "Failed to init event service");
5563

5664
rc = m_event_service.echo_send("hello");
@@ -87,7 +95,7 @@ EventPublisher::publish(const string tag, const event_params_t *params)
8795
string s;
8896

8997
/* Failure is no-op; The eventd service my be down
90-
* NOTE: This call atmost blocks for EVENTS_SERVICE_TIMEOUT_MILLISECS
98+
* NOTE: This call atmost blocks for EVENTS_SERVICE_TIMEOUT_MS_PUB
9199
* as provided in publisher init.
92100
*/
93101
m_event_service.echo_receive(s);
@@ -240,7 +248,8 @@ EventSubscriber::~EventSubscriber()
240248

241249

242250
int
243-
EventSubscriber::init(bool use_cache, const event_subscribe_sources_t *subs_sources)
251+
EventSubscriber::init(bool use_cache, int recv_timeout,
252+
const event_subscribe_sources_t *subs_sources)
244253
{
245254
/*
246255
* Initiate SUBS connection to XPUB end point.
@@ -268,8 +277,13 @@ EventSubscriber::init(bool use_cache, const event_subscribe_sources_t *subs_sour
268277
}
269278
}
270279

280+
if (recv_timeout != -1) {
281+
rc = zmq_setsockopt (m_socket, ZMQ_RCVTIMEO, &recv_timeout, sizeof (recv_timeout));
282+
RET_ON_ERR(rc == 0, "Failed to ZMQ_RCVTIMEO to %d", recv_timeout);
283+
}
284+
271285
if (use_cache) {
272-
rc = m_event_service.init_client(m_zmq_ctx, EVENTS_SERVICE_TIMEOUT_MILLISECS);
286+
rc = m_event_service.init_client(m_zmq_ctx, EVENTS_SERVICE_TIMEOUT_MS_SUB);
273287
RET_ON_ERR(rc == 0, "Fails to init the service");
274288

275289
if (m_event_service.cache_stop() == 0) {
@@ -379,12 +393,13 @@ EventSubscriber::event_receive(string &key, event_params_t &params, int &missed_
379393
static EventSubscriber *s_subscriber = NULL;
380394

381395
event_handle_t
382-
events_init_subscriber(bool use_cache, const event_subscribe_sources_t *sources)
396+
events_init_subscriber(bool use_cache, int recv_timeout,
397+
const event_subscribe_sources_t *sources)
383398
{
384399
if (s_subscriber == NULL) {
385400
EventSubscriber *p = new EventSubscriber();
386401

387-
RET_ON_ERR(p->init(use_cache, sources) == 0,
402+
RET_ON_ERR(p->init(use_cache, recv_timeout, sources) == 0,
388403
"Failed to init subscriber");
389404

390405
s_subscriber = p;
@@ -416,4 +431,8 @@ event_receive(event_handle_t handle, string &key,
416431
return -1;
417432
}
418433

434+
int event_last_error()
435+
{
436+
return recv_last_err;
437+
}
419438

common/events.h

+19-1
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,13 @@ typedef std::vector<std::string> event_subscribe_sources_t;
113113
* The cache service caches events during session down time. The deinit
114114
* start the caching and init call stops the caching.
115115
*
116+
* recv_timeout
117+
* Read blocks by default until an event is available for read.
118+
* 0 - Returns immediately, with or w/o event
119+
* -1 - Default; Blocks until an event is available for read
120+
* N - Count ofd milliseconds to wait for an event.
121+
*
122+
*
116123
* lst_subscribe_sources_t
117124
* List of subscription sources of interest.
118125
* The source value is the corresponding YANG module name.
@@ -123,6 +130,7 @@ typedef std::vector<std::string> event_subscribe_sources_t;
123130
* NULL on failure
124131
*/
125132
event_handle_t events_init_subscriber(bool use_cache=false,
133+
int recv_timeout = -1,
126134
const event_subscribe_sources_t *sources=NULL);
127135

128136
/*
@@ -174,13 +182,23 @@ void events_deinit_subscriber(event_handle_t &handle);
174182
*
175183
* return:
176184
* 0 - On success
177-
* -1 - On failure. The handle is not valid.
185+
* -1 - On failure. The handle is not valid or upon receive timeout.
178186
*
179187
*/
180188
int event_receive(event_handle_t handle, std::string &key,
181189
event_params_t &params, int &missed_cnt);
182190

183191

192+
/*
193+
* Get error code for last receive
194+
*
195+
* Set to EAGAIN on timeout
196+
* Any other value implies fatal error.
197+
*
198+
*/
199+
int event_last_error();
200+
201+
184202
/*
185203
* Cache drain timeout.
186204
*

common/events_common.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "events_common.h"
22

33
int zerrno = 0;
4+
int recv_last_err = 0;
45
int running_ut = 0;
56

67
/*

common/events_common.h

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ using namespace chrono;
2525

2626
extern int errno;
2727
extern int zerrno;
28+
extern int recv_last_err;
2829

2930
/*
3031
* Max count of possible concurrent event publishers
@@ -287,6 +288,7 @@ zmq_read_part(void *sock, int flag, int &more, DT &data)
287288
more = 0;
288289
zmq_msg_init(&msg);
289290
int rc = zmq_msg_recv(&msg, sock, flag);
291+
recv_last_err = zerrno;
290292

291293
if (rc != -1) {
292294
size_t more_size = sizeof (more);

common/events_pi.h

+1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ class EventSubscriber : public events_base
9393
EventSubscriber();
9494

9595
int init(bool use_cache=false,
96+
int recv_timeout= -1,
9697
const event_subscribe_sources_t *subs_sources=NULL);
9798

9899
virtual ~EventSubscriber();

0 commit comments

Comments
 (0)