7
7
* duplicates helps reduce load.
8
8
*/
9
9
10
- typedef map <string, EventPublisher *> lst_publishers_t ;
11
- lst_publishers_t s_publishers;
10
+ lst_publishers_t EventPublisher::s_publishers;
11
+
12
+ event_handle_t
13
+ EventPublisher::get_publisher (const string event_source)
14
+ {
15
+ event_handle_t ret = NULL ;
16
+ lst_publishers_t ::const_iterator itc = s_publishers.find (event_source);
17
+ if (itc != s_publishers.end ()) {
18
+ // Pre-exists
19
+ ret = itc->second ;
20
+ }
21
+ else {
22
+ EventPublisher_ptr_t p (n.get ()ew EventPublisher ());
23
+
24
+ int rc = p->init (event_source);
25
+
26
+ if (rc == 0 ) {
27
+ ret = p.get ();
28
+ s_publishers[event_source] = p;
29
+ }
30
+ }
31
+ return ret;
32
+ }
33
+
34
+ void
35
+ EventPublisher::drop_publisher (event_handle_t handle)
36
+ {
37
+ lst_publishers_t ::iterator it;
38
+
39
+ for (it=s_publishers.begin (); it != s_publishers.end (); ++it) {
40
+ if (it->second .get () == handle) {
41
+ s_publishers.erase (it);
42
+ break ;
43
+ }
44
+ }
45
+ }
12
46
13
47
EventPublisher::EventPublisher () :
14
48
m_zmq_ctx(NULL ), m_socket(NULL ), m_sequence(0 )
15
49
{
16
50
}
17
51
52
+ static string
53
+ get_uuid ()
54
+ {
55
+ uuid_t id;
56
+ char uuid_str[UUID_STR_SIZE];
57
+ uuid_generate (id);
58
+ uuid_unparse (id, uuid_str);
59
+ return string (uuid_str);
60
+ }
61
+
18
62
int EventPublisher::init (const string event_source)
19
63
{
20
64
m_zmq_ctx = zmq_ctx_new ();
@@ -43,13 +87,7 @@ int EventPublisher::init(const string event_source)
43
87
44
88
m_event_source = event_source;
45
89
46
- {
47
- uuid_t id;
48
- char uuid_str[UUID_STR_SIZE];
49
- uuid_generate (id);
50
- uuid_unparse (id, uuid_str);
51
- m_runtime_id = string (uuid_str);
52
- }
90
+ m_runtime_id = get_uuid ();
53
91
54
92
m_socket = sock;
55
93
out:
@@ -144,40 +182,13 @@ EventPublisher::publish(const string tag, const event_params_t *params)
144
182
event_handle_t
145
183
events_init_publisher (const string event_source)
146
184
{
147
- event_handle_t ret = NULL ;
148
- lst_publishers_t ::const_iterator itc = s_publishers.find (event_source);
149
- if (itc != s_publishers.end ()) {
150
- // Pre-exists
151
- ret = itc->second ;
152
- }
153
- else {
154
- EventPublisher *p = new EventPublisher ();
155
-
156
- int rc = p->init (event_source);
157
-
158
- if (rc != 0 ) {
159
- delete p;
160
- }
161
- else {
162
- ret = p;
163
- s_publishers[event_source] = p;
164
- }
165
- }
166
- return ret;
185
+ return EventPublisher::get_publisher (event_source);
167
186
}
168
187
169
188
void
170
189
events_deinit_publisher (event_handle_t handle)
171
190
{
172
- lst_publishers_t ::iterator it;
173
-
174
- for (it=s_publishers.begin (); it != s_publishers.end (); ++it) {
175
- if (it->second == handle) {
176
- delete it->second ;
177
- s_publishers.erase (it);
178
- break ;
179
- }
180
- }
191
+ EventPublisher::drop_publisher (handle);
181
192
}
182
193
183
194
int
@@ -193,6 +204,34 @@ event_publish(event_handle_t handle, const string tag, const event_params_t *par
193
204
}
194
205
195
206
207
+ event_handle_t
208
+ EventSubscriber::get_subscriber (bool use_cache, int recv_timeout,
209
+ const event_subscribe_sources_t *sources)
210
+ {
211
+
212
+ if (s_subscriber == NULL ) {
213
+ EventSubscriber_ptr_t sub (new EventSubscriber ());
214
+
215
+ RET_ON_ERR (sub->init (use_cache, recv_timeout, sources) == 0 ,
216
+ " Failed to init subscriber" );
217
+
218
+ s_subscriber = sub;
219
+ }
220
+ out:
221
+ return s_subscriber.get ();
222
+ }
223
+
224
+
225
+ void
226
+ EventSubscriber::drop_subscriber (event_handle_t handle)
227
+ {
228
+ if ((handle == s_subscriber) && (s_subscriber != NULL )) {
229
+ delete s_subscriber;
230
+ s_subscriber = NULL ;
231
+ }
232
+ }
233
+
234
+
196
235
EventSubscriber::EventSubscriber () : m_zmq_ctx(NULL ), m_socket(NULL ),
197
236
m_cache_read(false )
198
237
{};
@@ -407,41 +446,21 @@ EventSubscriber::event_receive(string &key, event_params_t ¶ms, int &missed_
407
446
408
447
409
448
/* Expect only one subscriber per process */
410
- static EventSubscriber *s_subscriber = NULL ;
449
+ EventSubscriber *EventSubscriber:: s_subscriber = NULL ;
411
450
412
451
event_handle_t
413
452
events_init_subscriber (bool use_cache, int recv_timeout,
414
453
const event_subscribe_sources_t *sources)
415
454
{
416
- EventSubscriber *sub = NULL ;
417
-
418
- if (s_subscriber == NULL ) {
419
- sub = new EventSubscriber ();
420
-
421
- RET_ON_ERR (sub->init (use_cache, recv_timeout, sources) == 0 ,
422
- " Failed to init subscriber" );
423
-
424
- s_subscriber = sub;
425
- sub = NULL ;
426
- }
427
- out:
428
- if (sub != NULL ) {
429
- delete sub;
430
- }
431
- return s_subscriber;
455
+ return EventSubscriber::get_subscriber (use_cache, recv_timeout, sources);
432
456
}
433
457
434
-
435
458
void
436
459
events_deinit_subscriber (event_handle_t handle)
437
460
{
438
- if ((handle == s_subscriber) && (s_subscriber != NULL )) {
439
- delete s_subscriber;
440
- s_subscriber = NULL ;
441
- }
461
+ EventSubscriber::drop_subscriber (handle);
442
462
}
443
463
444
-
445
464
event_receive_op_t
446
465
event_receive (event_handle_t handle)
447
466
{
0 commit comments