Skip to content

Commit d883c01

Browse files
first cut
1 parent 84dbd93 commit d883c01

File tree

3 files changed

+349
-0
lines changed

3 files changed

+349
-0
lines changed

common/events.h

+158
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Events library
3+
*
4+
* APIs are for publishing & receiving events with source, tag and params along with timestamp.
5+
*
6+
*/
7+
8+
9+
class events_base;
10+
11+
typedef events_base* event_handle_t;
12+
13+
/*
14+
* Initialize an event publisher instance for an event source.
15+
*
16+
* A single publisher instance is maintained for a source.
17+
* Any duplicate init call for a source will return the same instance.
18+
*
19+
* Choosing cache will help read cached data, during downtime, if any.
20+
*
21+
* NOTE:
22+
* The initialization occurs asynchronously.
23+
* Any event published before init is complete, is blocked until the init
24+
* is complete. Hence recommend, do the init as soon as the process starts.
25+
*
26+
* Input:
27+
* event_source
28+
* The YANG module name for the event source. All events published with the handle
29+
* returned by this call is tagged with this source, transparently. The receiver
30+
* could subscribe with this source as filter.
31+
* Return
32+
* Non NULL handle
33+
* NULL on failure
34+
*/
35+
36+
event_handle_t events_init_publisher(std::string &event_source);
37+
38+
/*
39+
* De-init/free the publisher
40+
*
41+
* Input:
42+
* Handle returned from events_init_publisher
43+
*
44+
* Output:
45+
* None
46+
*/
47+
void events_deinit_publisher(event_handle_t &handle);
48+
49+
50+
/*
51+
* List of event params
52+
*/
53+
typedef std::map<std::string, std::string> event_params_t;
54+
55+
/*
56+
* Publish an event
57+
*
58+
* Internally a globally unique sequence number is embedded in every published event,
59+
* The sequence numbers from same publishing instances can be compared
60+
* to see if there any missing events between.
61+
*
62+
* The sequence has two components as run-time-id that distinguishes
63+
* the running instance of a publisher and other a running sequence
64+
* starting from 0, which is local to this runtime-id.
65+
*
66+
* The receiver API keep next last received number for each runtime id
67+
* and use this info to compute missed event count upon next event.
68+
*
69+
* input:
70+
* handle - As obtained from events_init_publisher for a event-source.
71+
*
72+
* event_tag -
73+
* Name of the YANG container that defines this event in the
74+
* event-source module associated with this handle.
75+
*
76+
* YANG path formatted as "< event_source >:< event_tag >"
77+
* e.g. {"sonic-events-bgp:bgp-state": { "ip": "10.10.10.10", ...}}
78+
*
79+
* params -
80+
* Params associated with event; This may or may not contain
81+
* timestamp. In the absence, the timestamp is added, transparently.
82+
*
83+
*/
84+
void event_publish(event_handle_t handle, const std:string &event_tag,
85+
const event_params_t *params=NULL);
86+
87+
88+
89+
typedef std::vector<std::string> event_subscribe_sources_t;
90+
91+
/*
92+
* Initialize subscriber.
93+
* Init subscriber, optionally to filter by event-source.
94+
*
95+
* Input:
96+
* use_cache
97+
* When set to true, it will make use of the cache service transparently.
98+
* The cache service caches events during session down time (last deinit to this
99+
* init call).
100+
*
101+
* lst_subscribe_sources_t
102+
* List of subscription sources of interest.
103+
* The source value is the corresponding YANG module name.
104+
* e.g. "sonic-events-bgp " is the source modulr name for bgp.
105+
*
106+
* Return:
107+
* Non NULL handle on success
108+
* NULL on failure
109+
*/
110+
event_handle_t events_init_subscriber(bool use_cache=false,
111+
const event_subscribe_sources_t *sources=NULL);
112+
113+
/*
114+
* De-init/free the subscriber
115+
*
116+
* Input:
117+
* Handle returned from events_init_subscriber
118+
*
119+
* Output:
120+
* None
121+
*/
122+
void events_deinit_subscriber(event_handle_t &handle);
123+
124+
/*
125+
* Received event as JSON string as
126+
* < YANG path of schema >: {
127+
* event_params_t
128+
* }
129+
*/
130+
typedef std::string event_str_t;
131+
132+
/*
133+
* Receive an event.
134+
* A blocking call.
135+
*
136+
* This API maintains an expected sequence number and use the received
137+
* sequence in event to compute missed events count.
138+
*
139+
* input:
140+
* handle - As obtained from events_init_subscriber
141+
*
142+
* output:
143+
* event - Received event.
144+
*
145+
* missed_cnt:
146+
* Count of missed events from this sender, before this event. Sum of
147+
* missed count from all received events will give the total missed.
148+
*/
149+
int missed_cnt;
150+
151+
*
152+
* return:
153+
* 0 - On success
154+
* -1 - On failure. The handle is not valid.
155+
*
156+
*/
157+
int event_receive(event_handle_t handle, event_str_t &event, int &missed_cnt);
158+

common/events_common.cpp

+125
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#include "events_common.h"
2+
#include <boost/serialization/map.hpp>
3+
#include <boost/archive/text_iarchive.hpp>
4+
#include <boost/archive/text_oarchive.hpp>
5+
6+
/*
7+
* defaults for all config entries
8+
*/
9+
#define CFG_VAL map_str_str_t::value_type
10+
map_str_str_t cfg_data = {
11+
CFG_VAL(XSUB_END_KEY, "tcp://127.0.0.1:5570"),
12+
CFG_VAL(XPUB_END_KEY, "tcp://127.0.0.1:5571"),
13+
CFG_VAL(REQ_REP_END_KEY, "tcp://127.0.0.1:5572"),
14+
CFG_VAL(PAIR_END_KEY, "tcp://127.0.0.1:5573"),
15+
CFG_VAL(STATS_UPD_SECS, 5)
16+
};
17+
18+
void
19+
_read_init_config()
20+
{
21+
ifstream fs (INIT_CFG_PATH);
22+
23+
if (!fs.is_open())
24+
return;
25+
26+
stringstream buffer;
27+
buffer << fs.rdbuf();
28+
29+
const auto &data = nlohmann::json::parse(buffer.str());
30+
31+
const auto it = data.find(EVENTS_KEY);
32+
if (it == data.end())
33+
return;
34+
35+
const auto edata = *it;
36+
for (events_json_data_t::iterator itJ = cfg_data.begin();
37+
itJ != cfg_data.end(); ++itJ) {
38+
auto itE = edata.find(itJ->first);
39+
if (itE != edata.end()) {
40+
itJ->second = *itE;
41+
}
42+
}
43+
44+
return;
45+
}
46+
47+
string
48+
get_config(const string key)
49+
{
50+
static bool init = false;
51+
52+
if (!init) {
53+
_read_init_config();
54+
init = true;
55+
}
56+
/* Intentionally crash for non-existing key, as this
57+
* is internal code bug
58+
*/
59+
return cfg_data[key];
60+
}
61+
62+
const string
63+
get_timestamp()
64+
{
65+
std::stringstream ss, sfrac;
66+
67+
auto timepoint = system_clock::now();
68+
std::time_t tt = system_clock::to_time_t (timepoint);
69+
struct std::tm * ptm = std::localtime(&tt);
70+
71+
uint64_t ms = duration_cast<microseconds>(timepoint.time_since_epoch()).count();
72+
uint64_t sec = duration_cast<seconds>(timepoint.time_since_epoch()).count();
73+
uint64_t mfrac = ms - (sec * 1000 * 1000);
74+
75+
sfrac << mfrac;
76+
77+
ss << put_time(ptm, "%b %e %H:%M:%S.") << sfrac.str().substr(0, 6) << "Z";
78+
return ss.str();
79+
}
80+
81+
82+
/*
83+
* Way to serialize map
84+
* boost::archive::text_oarchive could be used to archive any struct/class
85+
* but that class needs some additional support, that declares
86+
* boost::serialization::access as private friend and couple more tweaks
87+
* std::map inherently supports serialization
88+
*/
89+
const string
90+
serialize(const map_str_str_t& data)
91+
{
92+
std::stringstream ss;
93+
boost::archive::text_oarchive oarch(ss);
94+
oarch << data;
95+
return ss.str();
96+
}
97+
98+
void
99+
deserialize(const string& s, map_str_str_t& data)
100+
{
101+
std::stringstream ss;
102+
ss << s;
103+
boost::archive::text_iarchive iarch(ss);
104+
iarch >> data;
105+
return;
106+
}
107+
108+
109+
void
110+
map_to_zmsg(const map_str_str_t& data, zmq_msg_t &msg)
111+
{
112+
string s = serialize(data);
113+
114+
zmq_msg_init_size(&msg, s.size());
115+
strncpy((char *)zmq_msg_data(&msg), s.c_str(), s.size());
116+
}
117+
118+
119+
void
120+
zmsg_to_map(zmq_msg_t &msg, map_str_str_t& data)
121+
{
122+
string s((const char *)zmq_msg_data(&msg), zmq_msg_size(&msg));
123+
deserialize(s, data);
124+
}
125+

common/events_common.h

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* common APIs used by events code.
3+
*/
4+
#include <chrono>
5+
#include <fstream>
6+
#include <thread>
7+
#include "string.h"
8+
#include "json.hpp"
9+
#include "zmq.h"
10+
#include <unordered_map>
11+
12+
#include "logger.h"
13+
#include "events.h"
14+
15+
using namespace std;
16+
using namespace chrono;
17+
18+
#define ERR_CHECK(res, ...) {\
19+
if (!(res)) \
20+
SWSS_LOG_ERROR(__VA_ARGS__); }
21+
22+
// Some simple definitions
23+
//
24+
typedef map<string, string> map_str_str_t;
25+
26+
/*
27+
* Config that can be read from init_cfg
28+
*/
29+
#define INIT_CFG_PATH "/etc/sonic/init_cfg.json"
30+
#define CFG_EVENTS_KEY "events"
31+
32+
/* configurable entities' keys */
33+
#define XSUB_END_KEY "xsub_path"
34+
#define XPUB_END_KEY "xpub_path"
35+
#define REQ_REP_END_KEY "req_rep_path"
36+
#define REQ_PAIR_KEY "pair_path"
37+
#define STATS_UPD_SECS "stats_upd_secs"
38+
39+
/* Provide a key for configurable entity */
40+
string get_config(const string key);
41+
42+
const string get_timestamp();
43+
44+
/*
45+
* events are published as two part zmq message.
46+
* First part only has the event source, so receivers could
47+
* filter by source.
48+
*
49+
* Second part contains event as defined below.
50+
*
51+
* The callers would only see, event_str_t
52+
*/
53+
typedef struct {
54+
event_str_t event;
55+
uint32_t runtime_id;
56+
uint32_t sequence;
57+
} internal_event_t;
58+
59+
const string serialize(const map_str_str_t & data);
60+
61+
void deserialize(const string& s, map_str_str_t & data);
62+
63+
void map_to_zmsg(const map_str_str_t & data, zmq_msg_t &msg);
64+
65+
void zmsg_to_map(zmq_msg_t &msg, map_str_str_t & data);
66+

0 commit comments

Comments
 (0)