1
1
#include " events_pi.h"
2
+ #include " events_wrap.h"
2
3
3
4
/*
4
5
* Track created publishers to avoid duplicates
@@ -73,6 +74,7 @@ EventPublisher::publish(const string tag, const event_params_t *params)
73
74
{
74
75
int rc;
75
76
internal_event_t event_data;
77
+ string key (m_event_source + " :" + tag);
76
78
77
79
if (m_event_service.is_active ()) {
78
80
string s;
@@ -107,7 +109,7 @@ EventPublisher::publish(const string tag, const event_params_t *params)
107
109
map_to_str (*params).c_str ());
108
110
109
111
{
110
- map_str_str_t event_str_map = { { m_event_source + " : " + tag , param_str}};
112
+ map_str_str_t event_str_map = { { key , param_str}};
111
113
112
114
rc = serialize (event_str_map, event_data[EVENT_STR_DATA]);
113
115
RET_ON_ERR (rc == 0 , " failed to serialize event str %s" ,
@@ -119,6 +121,22 @@ EventPublisher::publish(const string tag, const event_params_t *params)
119
121
120
122
rc = zmq_message_send (m_socket, m_event_source, event_data);
121
123
RET_ON_ERR (rc == 0 , " failed to send for tag %s" , tag.c_str ());
124
+
125
+ {
126
+ nlohmann::json msg = nlohmann::json::object ();
127
+ {
128
+ nlohmann::json params_data = nlohmann::json::object ();
129
+
130
+ for (event_params_t ::const_iterator itc = params->begin ();
131
+ itc != params->end (); ++itc) {
132
+ params_data[itc->first ] = itc->second ;
133
+ }
134
+ msg[key] = params_data;
135
+ }
136
+ string json_str (msg.dump ());
137
+ SWSS_LOG_INFO (" EVENT_PUBLISHED: %s" , json_str.c_str ());
138
+ }
139
+
122
140
out:
123
141
return rc;
124
142
}
@@ -418,24 +436,170 @@ events_deinit_subscriber(event_handle_t handle)
418
436
}
419
437
420
438
439
+ event_receive_op_t
440
+ event_receive (event_handle_t handle)
441
+ {
442
+ event_receive_op_t op;
443
+
444
+ if ((handle == s_subscriber) && (s_subscriber != NULL )) {
445
+ op.rc = s_subscriber->event_receive (op.key , op.params , op.missed_cnt );
446
+ }
447
+ else {
448
+ op.rc = -1 ;
449
+ }
450
+ return op;
451
+ }
452
+
453
+ void *
454
+ events_init_publisher_wrap (const char *args)
455
+ {
456
+ SWSS_LOG_DEBUG (" events_init_publisher_wrap: args=%s" ,
457
+ (args != NULL ? args : " <null pointer>" ));
458
+
459
+ if (args == NULL ) {
460
+ return NULL ;
461
+ }
462
+ const auto &data = nlohmann::json::parse (args);
463
+
464
+ string source;
465
+ for (auto it = data.cbegin (); it != data.cend (); ++it) {
466
+ if ((it.key () == ARGS_SOURCE) && (*it).is_string ()) {
467
+ source = it.value ();
468
+ }
469
+ }
470
+ return events_init_publisher (source);
471
+ }
472
+
473
+
474
+ void
475
+ events_deinit_publisher_wrap (void *handle)
476
+ {
477
+ events_deinit_publisher (handle);
478
+ }
479
+
421
480
422
481
int
423
- event_receive (event_handle_t handle, string &key,
424
- event_params_t ¶ms, int &missed_cnt)
482
+ event_publish_wrap (void *handle, const char *args)
425
483
{
426
- if ((handle == s_subscriber) && (s_subscriber != NULL )) {
427
- return s_subscriber->event_receive (key, params, missed_cnt);
484
+ string tag;
485
+ event_params_t params;
486
+
487
+ SWSS_LOG_DEBUG (" events_init_publisher_wrap: handle=%p args=%s" ,
488
+ handle, (args != NULL ? args : " <null pointer>" ));
489
+
490
+ if (args == NULL ) {
491
+ return -1 ;
428
492
}
429
- return -1 ;
493
+ const auto &data = nlohmann::json::parse (args);
494
+
495
+ for (auto it = data.cbegin (); it != data.cend (); ++it) {
496
+ if ((it.key () == ARGS_TAG) && (*it).is_string ()) {
497
+ tag = it.value ();
498
+ }
499
+ else if ((it.key () == ARGS_PARAMS) && (*it).is_object ()) {
500
+ const auto ¶ms_data = *it;
501
+ for (auto itp = params_data.cbegin (); itp != params_data.cend (); ++itp) {
502
+ if ((*itp).is_string ()) {
503
+ params[itp.key ()] = itp.value ();
504
+ }
505
+ }
506
+ }
507
+ }
508
+ return event_publish (handle, tag, ¶ms);
509
+ }
510
+
511
+ void *
512
+ events_init_subscriber_wrap (const char *args)
513
+ {
514
+ bool use_cache = true ;
515
+ int recv_timeout = -1 ;
516
+ event_subscribe_sources_t sources;
517
+
518
+ SWSS_LOG_DEBUG (" events_init_subsriber_wrap: args:%s" , args);
519
+
520
+ if (args != NULL ) {
521
+ const auto &data = nlohmann::json::parse (args);
522
+
523
+ for (auto it = data.cbegin (); it != data.cend (); ++it) {
524
+ if ((it.key () == ARGS_USE_CACHE) && (*it).is_boolean ()) {
525
+ use_cache = it.value ();
526
+ }
527
+ else if ((it.key () == ARGS_RECV_TIMEOUT) && (*it).is_number_integer ()) {
528
+ recv_timeout = it.value ();
529
+ }
530
+ }
531
+ }
532
+ void *handle = events_init_subscriber (use_cache, recv_timeout);
533
+ SWSS_LOG_DEBUG (" events_init_subscriber_wrap: handle=%p" , handle);
534
+ return handle;
430
535
}
431
536
432
537
433
- event_receive_op_t
434
- event_receive_wrap ( event_handle_t handle)
538
+ void
539
+ events_deinit_subscriber_wrap ( void * handle)
435
540
{
436
- event_receive_op_t op ;
541
+ SWSS_LOG_DEBUG ( " events_deinit_subsriber_wrap: args=%p " , handle) ;
437
542
438
- op.rc = event_receive (handle, op.key , op.params , op.missed_cnt );
439
- return op;
543
+ events_deinit_subscriber (handle);
440
544
}
441
545
546
+
547
+ int
548
+ event_receive_wrap (void *handle, char *event_str,
549
+ int event_str_sz, char *missed_cnt_str, int missed_cnt_str_sz)
550
+ {
551
+ event_receive_op_t evt;
552
+ int rc = 0 ;
553
+
554
+ SWSS_LOG_DEBUG (" events_receive_wrap handle=%p event-sz=%d missed-sz=%d\n " ,
555
+ handle, event_str_sz, missed_cnt_str_sz);
556
+
557
+ evt = event_receive (handle);
558
+
559
+ if (evt.rc == 0 ) {
560
+ nlohmann::json res = nlohmann::json::object ();
561
+
562
+ {
563
+ nlohmann::json params_data = nlohmann::json::object ();
564
+
565
+ for (event_params_t ::const_iterator itc = evt.params .begin (); itc != evt.params .end (); ++itc) {
566
+ params_data[itc->first ] = itc->second ;
567
+ }
568
+ res[evt.key ] = params_data;
569
+ }
570
+ string json_str (res.dump ());
571
+ rc = snprintf (event_str, event_str_sz, " %s" , json_str.c_str ());
572
+ if (rc >= event_str_sz) {
573
+ SWSS_LOG_ERROR (" truncated event buffer.need=%d given=%d" ,
574
+ rc, event_str_sz);
575
+ event_str[event_str_sz-1 ] = 0 ;
576
+ }
577
+
578
+ int rc_missed = snprintf (missed_cnt_str, missed_cnt_str_sz, " %d" , evt.missed_cnt );
579
+ if (rc_missed >= missed_cnt_str_sz) {
580
+ SWSS_LOG_ERROR (" missed cnt (%d) buffer.need=%d given=%d" ,
581
+ evt.missed_cnt , rc_missed, missed_cnt_str_sz);
582
+ missed_cnt_str[missed_cnt_str_sz-1 ] = 0 ;
583
+ }
584
+ }
585
+ else if (evt.rc > 0 ) {
586
+ // timeout
587
+ rc = 0 ;
588
+ }
589
+ else {
590
+ rc = evt.rc ;
591
+ }
592
+
593
+ SWSS_LOG_DEBUG (" events_receive_wrap rc=%d event_str=%s missed=%s\n " ,
594
+ rc, event_str, missed_cnt_str);
595
+
596
+ return rc;
597
+ }
598
+
599
+
600
+ void swssSetLogPriority (int pri)
601
+ {
602
+ swss::Logger::setMinPrio ((swss::Logger::Priority) pri);
603
+ }
604
+
605
+
0 commit comments