Skip to content

Commit 98fb238

Browse files
Merge branch 'syslog_telemetry_shared' of https://github.com/renukamanavalan/sonic-swss-common into syslog_telemetry_shared
2 parents 6bebad3 + 48a36ce commit 98fb238

File tree

7 files changed

+503
-78
lines changed

7 files changed

+503
-78
lines changed

common/events.cpp

+190-20
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "events_pi.h"
2+
#include "events_wrap.h"
23

34
/*
45
* Track created publishers to avoid duplicates
@@ -73,6 +74,7 @@ EventPublisher::publish(const string tag, const event_params_t *params)
7374
{
7475
int rc;
7576
internal_event_t event_data;
77+
string key(m_event_source + ":" + tag);
7678

7779
if (m_event_service.is_active()) {
7880
string s;
@@ -107,7 +109,7 @@ EventPublisher::publish(const string tag, const event_params_t *params)
107109
map_to_str(*params).c_str());
108110

109111
{
110-
map_str_str_t event_str_map = { { m_event_source + ":" + tag, param_str}};
112+
map_str_str_t event_str_map = { { key, param_str}};
111113

112114
rc = serialize(event_str_map, event_data[EVENT_STR_DATA]);
113115
RET_ON_ERR(rc == 0, "failed to serialize event str %s",
@@ -119,6 +121,22 @@ EventPublisher::publish(const string tag, const event_params_t *params)
119121

120122
rc = zmq_message_send(m_socket, m_event_source, event_data);
121123
RET_ON_ERR(rc == 0, "failed to send for tag %s", tag.c_str());
124+
125+
{
126+
nlohmann::json msg = nlohmann::json::object();
127+
{
128+
nlohmann::json params_data = nlohmann::json::object();
129+
130+
for (event_params_t::const_iterator itc = params->begin();
131+
itc != params->end(); ++itc) {
132+
params_data[itc->first] = itc->second;
133+
}
134+
msg[key] = params_data;
135+
}
136+
string json_str(msg.dump());
137+
SWSS_LOG_INFO("EVENT_PUBLISHED: %s", json_str.c_str());
138+
}
139+
122140
out:
123141
return rc;
124142
}
@@ -127,10 +145,10 @@ event_handle_t
127145
events_init_publisher(const string event_source)
128146
{
129147
event_handle_t ret = NULL;
130-
lst_publishers_t::iterator it = s_publishers.find(event_source);
131-
if (it != s_publishers.end()) {
148+
lst_publishers_t::const_iterator itc = s_publishers.find(event_source);
149+
if (itc != s_publishers.end()) {
132150
// Pre-exists
133-
ret = it->second;
151+
ret = itc->second;
134152
}
135153
else {
136154
EventPublisher *p = new EventPublisher();
@@ -348,17 +366,15 @@ EventSubscriber::event_receive(string &key, event_params_t &params, int &missed_
348366
/* Find any missed events for this runtime ID */
349367
missed_cnt = 0;
350368
sequence_t seq = str_to_seq(event_data[EVENT_SEQUENCE]);
351-
track_info_t::iterator it = m_track.find(event_data[EVENT_RUNTIME_ID]);
352-
if (it != m_track.end()) {
369+
track_info_t::const_iterator itc = m_track.find(event_data[EVENT_RUNTIME_ID]);
370+
if (itc != m_track.end()) {
353371
/* current seq - last read - 1 == 0 if none missed */
354-
missed_cnt = seq - it->second.seq - 1;
355-
it->second = evt_info_t(seq);
372+
missed_cnt = seq - itc->second.seq - 1;
356373
}
357374
else {
358375
if (m_track.size() > (MAX_PUBLISHERS_COUNT + 10)) {
359376
prune_track();
360377
}
361-
m_track[event_data[EVENT_RUNTIME_ID]] = evt_info_t(seq);
362378
}
363379
if (missed_cnt >= 0) {
364380
map_str_str_t ev;
@@ -376,6 +392,8 @@ EventSubscriber::event_receive(string &key, event_params_t &params, int &missed_
376392
rc = deserialize(ev.begin()->second, params);
377393
RET_ON_ERR(rc == 0, "failed to deserialize params %s",
378394
ev.begin()->second.substr(0, 32).c_str());
395+
396+
m_track[event_data[EVENT_RUNTIME_ID]] = evt_info_t(seq);
379397

380398
}
381399
else {
@@ -395,15 +413,21 @@ event_handle_t
395413
events_init_subscriber(bool use_cache, int recv_timeout,
396414
const event_subscribe_sources_t *sources)
397415
{
416+
EventSubscriber *sub = NULL;
417+
398418
if (s_subscriber == NULL) {
399-
EventSubscriber *sub = new EventSubscriber();
419+
sub = new EventSubscriber();
400420

401421
RET_ON_ERR(sub->init(use_cache, recv_timeout, sources) == 0,
402422
"Failed to init subscriber");
403423

404424
s_subscriber = sub;
425+
sub = NULL;
405426
}
406427
out:
428+
if (sub != NULL) {
429+
delete sub;
430+
}
407431
return s_subscriber;
408432
}
409433

@@ -418,24 +442,170 @@ events_deinit_subscriber(event_handle_t handle)
418442
}
419443

420444

445+
event_receive_op_t
446+
event_receive(event_handle_t handle)
447+
{
448+
event_receive_op_t op;
449+
450+
if ((handle == s_subscriber) && (s_subscriber != NULL)) {
451+
op.rc = s_subscriber->event_receive(op.key, op.params, op.missed_cnt);
452+
}
453+
else {
454+
op.rc = -1;
455+
}
456+
return op;
457+
}
458+
459+
void *
460+
events_init_publisher_wrap(const char *args)
461+
{
462+
SWSS_LOG_DEBUG("events_init_publisher_wrap: args=%s",
463+
(args != NULL ? args : "<null pointer>"));
464+
465+
if (args == NULL) {
466+
return NULL;
467+
}
468+
const auto &data = nlohmann::json::parse(args);
469+
470+
string source;
471+
for (auto it = data.cbegin(); it != data.cend(); ++it) {
472+
if ((it.key() == ARGS_SOURCE) && (*it).is_string()) {
473+
source = it.value();
474+
}
475+
}
476+
return events_init_publisher(source);
477+
}
478+
479+
480+
void
481+
events_deinit_publisher_wrap(void *handle)
482+
{
483+
events_deinit_publisher(handle);
484+
}
485+
421486

422487
int
423-
event_receive(event_handle_t handle, string &key,
424-
event_params_t &params, int &missed_cnt)
488+
event_publish_wrap(void *handle, const char *args)
425489
{
426-
if ((handle == s_subscriber) && (s_subscriber != NULL)) {
427-
return s_subscriber->event_receive(key, params, missed_cnt);
490+
string tag;
491+
event_params_t params;
492+
493+
SWSS_LOG_DEBUG("events_init_publisher_wrap: handle=%p args=%s",
494+
handle, (args != NULL ? args : "<null pointer>"));
495+
496+
if (args == NULL) {
497+
return -1;
428498
}
429-
return -1;
499+
const auto &data = nlohmann::json::parse(args);
500+
501+
for (auto it = data.cbegin(); it != data.cend(); ++it) {
502+
if ((it.key() == ARGS_TAG) && (*it).is_string()) {
503+
tag = it.value();
504+
}
505+
else if ((it.key() == ARGS_PARAMS) && (*it).is_object()) {
506+
const auto &params_data = *it;
507+
for (auto itp = params_data.cbegin(); itp != params_data.cend(); ++itp) {
508+
if ((*itp).is_string()) {
509+
params[itp.key()] = itp.value();
510+
}
511+
}
512+
}
513+
}
514+
return event_publish(handle, tag, &params);
430515
}
431516

517+
void *
518+
events_init_subscriber_wrap(const char *args)
519+
{
520+
bool use_cache = true;
521+
int recv_timeout = -1;
522+
event_subscribe_sources_t sources;
432523

433-
event_receive_op_t
434-
event_receive_wrap(event_handle_t handle)
524+
SWSS_LOG_DEBUG("events_init_subsriber_wrap: args:%s", args);
525+
526+
if (args != NULL) {
527+
const auto &data = nlohmann::json::parse(args);
528+
529+
for (auto it = data.cbegin(); it != data.cend(); ++it) {
530+
if ((it.key() == ARGS_USE_CACHE) && (*it).is_boolean()) {
531+
use_cache = it.value();
532+
}
533+
else if ((it.key() == ARGS_RECV_TIMEOUT) && (*it).is_number_integer()) {
534+
recv_timeout = it.value();
535+
}
536+
}
537+
}
538+
void *handle = events_init_subscriber(use_cache, recv_timeout);
539+
SWSS_LOG_DEBUG("events_init_subscriber_wrap: handle=%p", handle);
540+
return handle;
541+
}
542+
543+
544+
void
545+
events_deinit_subscriber_wrap(void *handle)
435546
{
436-
event_receive_op_t op;
547+
SWSS_LOG_DEBUG("events_deinit_subsriber_wrap: args=%p", handle);
437548

438-
op.rc = event_receive(handle, op.key, op.params, op.missed_cnt);
439-
return op;
549+
events_deinit_subscriber(handle);
440550
}
441551

552+
553+
int
554+
event_receive_wrap(void *handle, char *event_str,
555+
int event_str_sz, char *missed_cnt_str, int missed_cnt_str_sz)
556+
{
557+
event_receive_op_t evt;
558+
int rc = 0;
559+
560+
SWSS_LOG_DEBUG("events_receive_wrap handle=%p event-sz=%d missed-sz=%d\n",
561+
handle, event_str_sz, missed_cnt_str_sz);
562+
563+
evt = event_receive(handle);
564+
565+
if (evt.rc == 0) {
566+
nlohmann::json res = nlohmann::json::object();
567+
568+
{
569+
nlohmann::json params_data = nlohmann::json::object();
570+
571+
for (event_params_t::const_iterator itc = evt.params.begin(); itc != evt.params.end(); ++itc) {
572+
params_data[itc->first] = itc->second;
573+
}
574+
res[evt.key] = params_data;
575+
}
576+
string json_str(res.dump());
577+
rc = snprintf(event_str, event_str_sz, "%s", json_str.c_str());
578+
if (rc >= event_str_sz) {
579+
SWSS_LOG_ERROR("truncated event buffer.need=%d given=%d",
580+
rc, event_str_sz);
581+
event_str[event_str_sz-1] = 0;
582+
}
583+
584+
int rc_missed = snprintf(missed_cnt_str, missed_cnt_str_sz, "%d", evt.missed_cnt);
585+
if (rc_missed >= missed_cnt_str_sz) {
586+
SWSS_LOG_ERROR("missed cnt (%d) buffer.need=%d given=%d",
587+
evt.missed_cnt, rc_missed, missed_cnt_str_sz);
588+
missed_cnt_str[missed_cnt_str_sz-1] = 0;
589+
}
590+
}
591+
else if (evt.rc > 0) {
592+
// timeout
593+
rc = 0;
594+
}
595+
else {
596+
rc = evt.rc;
597+
}
598+
599+
SWSS_LOG_DEBUG("events_receive_wrap rc=%d event_str=%s missed=%s\n",
600+
rc, event_str, missed_cnt_str);
601+
602+
return rc;
603+
}
604+
605+
606+
void swssSetLogPriority(int pri)
607+
{
608+
swss::Logger::setMinPrio((swss::Logger::Priority) pri);
609+
}
610+
611+

common/events.h

+9-31
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,13 @@ event_handle_t events_init_subscriber(bool use_cache=false,
151151
void events_deinit_subscriber(event_handle_t handle);
152152

153153

154+
typedef struct {
155+
int rc; /* Return value of event_receive */
156+
std::string key; /* key */
157+
event_params_t params; /* Params received */
158+
int missed_cnt; /* missed count */
159+
} event_receive_op_t;
160+
154161
/*
155162
* Receive an event.
156163
* A blocking call unless the subscriber is created with a timeout.
@@ -189,40 +196,11 @@ void events_deinit_subscriber(event_handle_t handle);
189196
*
190197
* return:
191198
* 0 - On success
192-
* > 0 - On failure, returns zmq_errno, if failure is zmq socket related.
199+
* > 0 - Implies failure due to timeout.
193200
* < 0 - For all other failures
194201
*
195202
*/
196-
int event_receive(event_handle_t handle, std::string &key,
197-
event_params_t &params, int &missed_cnt);
198-
199-
/*
200-
* event_receive_wrap
201-
*
202-
* Returns o/p as structured.
203-
* This is handy for invocation via python.
204-
*
205-
* input:
206-
* handle - As obtained from events_init_subscriber
207-
*
208-
* output:
209-
* None
210-
*
211-
* Return:
212-
* struct that gets return value and all o/p params of event_receive
213-
*/
214-
typedef struct {
215-
int rc; /* Return value of event_receive */
216-
/* o/p params from event receive */
217-
std::string key;
218-
event_params_t params;
219-
int missed_cnt;
220-
} event_receive_op_t;
221-
222-
event_receive_op_t event_receive_wrap(event_handle_t handle);
203+
event_receive_op_t event_receive(event_handle_t handle);
223204

224-
/* Non ZMQ Error codes */
225-
#define ERR_MESSAGE_INVALID -2
226-
#define ERR_OTHER -1
227205

228206
#endif /* !_EVENTS_H */

common/events_common.h

+14-2
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,28 @@
1818
#include <boost/archive/text_oarchive.hpp>
1919

2020
#include "logger.h"
21-
#include "events.h"
2221

2322
using namespace std;
2423
using namespace chrono;
2524

25+
#define ERR_MESSAGE_INVALID -2
26+
#define ERR_OTHER -1
27+
2628
/*
2729
* Max count of possible concurrent event publishers
2830
* We maintain a cache of last seen sequence number per publisher.
2931
* This provides a MAX ceiling for cache.
30-
* Any more publishers over this count should indicate a serious bug.
32+
* A publisher exiting will be not known to receiver, so some publishers
33+
* who gets periodically invoked as every N seconds, could over time
34+
* cause the cache over flow.
35+
* whenever the cache hits this max, old instances are removed.
36+
*
37+
* RFE: Publishers who publish at least 1 event, declare their
38+
* exit via a reserved event, so receivers could drop tracking
39+
* hence avoid overflow. Else, a long running process with no event
40+
* but still active could lose its record. This is still not very
41+
* serious, as first message from such publisher is less likely
42+
* to be lost and it could create a record.
3143
*/
3244
#define MAX_PUBLISHERS_COUNT 1000
3345

0 commit comments

Comments
 (0)