Skip to content

Commit b0d7ee0

Browse files
Made zmq send & receive thread safe
1 parent ce606d4 commit b0d7ee0

File tree

6 files changed

+195
-179
lines changed

6 files changed

+195
-179
lines changed

common/events.cpp

+3-10
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,8 @@ EventSubscriber::~EventSubscriber()
215215
internal_event_t evt_data;
216216

217217
rc = zmq_message_read(m_socket, ZMQ_DONTWAIT, source, evt_data);
218-
if (rc == -1) {
219-
if (zerrno == EAGAIN) {
220-
rc = 0;
221-
}
218+
if (rc != 0) {
219+
/* Break on any failure, including EAGAIN */
222220
break;
223221
}
224222

@@ -344,7 +342,7 @@ EventSubscriber::event_receive(string &key, event_params_t &params, int &missed_
344342
/* Read from SUBS channel */
345343
string evt_source;
346344
rc = zmq_message_read(m_socket, 0, evt_source, event_data);
347-
RET_ON_ERR(rc == 0, "Failed to read message from sock");
345+
RET_ON_ERR(rc == 0, "Failed to read message from sock rc=%d", rc);
348346
}
349347

350348
/* Find any missed events for this runtime ID */
@@ -441,8 +439,3 @@ event_receive_wrap(event_handle_t handle)
441439
return op;
442440
}
443441

444-
int event_last_error()
445-
{
446-
return zerrno;
447-
}
448-

common/events.h

+9-13
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,9 @@ typedef std::map<std::string, std::string> event_params_t;
9696
* e.g. "2022-08-17T02:39:21.286611Z"
9797
*
9898
* return:
99-
* 0 - On success
100-
* -1 - On failure.
99+
* 0 - On success
100+
* > 0 - On failure, returns zmq_errno, if failure is zmq socket related.
101+
* < 0 - For all other failures
101102
*/
102103
int event_publish(event_handle_t handle, const std::string event_tag,
103104
const event_params_t *params=NULL);
@@ -187,8 +188,9 @@ void events_deinit_subscriber(event_handle_t handle);
187188
* missed count from all received events will give the total missed.
188189
*
189190
* return:
190-
* 0 - On success
191-
* -1 - On failure. The handle is not valid or upon receive timeout.
191+
* 0 - On success
192+
* > 0 - On failure, returns zmq_errno, if failure is zmq socket related.
193+
* < 0 - For all other failures
192194
*
193195
*/
194196
int event_receive(event_handle_t handle, std::string &key,
@@ -219,14 +221,8 @@ typedef struct {
219221

220222
event_receive_op_t event_receive_wrap(event_handle_t handle);
221223

222-
223-
/*
224-
* Get error code for last receive
225-
*
226-
* Set to EAGAIN on timeout
227-
* Any other value implies fatal error.
228-
*
229-
*/
230-
int event_last_error();
224+
/* Non ZMQ Error codes */
225+
#define ERR_MESSAGE_INVALID -2
226+
#define ERR_OTHER -1
231227

232228
#endif /* !_EVENTS_H */

common/events_common.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#include "events_common.h"
22

3-
int zerrno = 0;
43
int running_ut = 0;
54

65
/*

0 commit comments

Comments
 (0)