Skip to content

Commit d9d24b6

Browse files
intermediate saving
1 parent 9763d3a commit d9d24b6

8 files changed

+460
-71
lines changed

common/eventd.cpp

+249
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
#include <thread>
2+
#include "eventd.h"
3+
4+
/*
5+
* There are 3 threads, including the main
6+
*
7+
* main thread -- Runs eventd service that accepts commands event_req_type_t
8+
* This can be used to control caching events and a no-op echo service.
9+
*
10+
* capture/cache service
11+
* Saves all the events between cache start & stop
12+
*
13+
* Main proxy service that runs XSUB/XPUB ends
14+
*/
15+
16+
#define READ_SET_SIZE 100
17+
18+
19+
eventd_server::eventd_server() : m_capture(NULL)
20+
{
21+
m_ctx = zmq_ctx_new();
22+
RET_ON_ERR(m_ctx != NULL, "Failed to get zmq ctx");
23+
out:
24+
return;
25+
}
26+
27+
28+
eventd_server::~eventd_server()
29+
{
30+
close();
31+
}
32+
33+
34+
int
35+
eventd_server::zproxy_service()
36+
{
37+
int ret = -1;
38+
SWSS_LOG_INFO("Start xpub/xsub proxy");
39+
40+
void *frontend = zmq_socket(m_ctx, ZMQ_XSUB);
41+
RET_ON_ERR(frontend != NULL, "failing to get ZMQ_XSUB socket");
42+
43+
int rc = zmq_bind(frontend, get_config(XSUB_END_KEY));
44+
RET_ON_ERR(rc == 0, "Failing to bind XSUB to %s", get_config(XSUB_END_KEY));
45+
46+
void *backend = zmq_socket(m_ctx, ZMQ_XPUB);
47+
RET_ON_ERR(backend != NULL, "failing to get ZMQ_XPUB socket");
48+
49+
rc = zmq_bind(backend, get_config(XPUB_END_KEY));
50+
RET_ON_ERR(rc == 0, "Failing to bind XPUB to %s", get_config(XPUB_END_KEY));
51+
52+
void *capture = zmq_socket(m_ctx, ZMQ_PUB);
53+
RET_ON_ERR(capture != NULL, "failing to get ZMQ_XSUB socket");
54+
55+
rc = zmq_bind(capture, get_config(CAPTURE_END_KEY));
56+
RET_ON_ERR(rc == 0, "Failing to bind PAIR to %s", get_config(PAIR_END_KEY));
57+
58+
m_thread_proxy = thread(&eventd_server::zproxy_service_run, this, frontend,
59+
backend, capture);
60+
ret = 0;
61+
out:
62+
return ret;
63+
}
64+
65+
66+
void
67+
eventd_server::zproxy_service_run(void *frontend, void *backend, void *capture)
68+
{
69+
SWSS_LOG_INFO("Running xpub/xsub proxy");
70+
71+
/* runs forever until zmq context is terminated */
72+
zmq_proxy(frontend, backend, capture);
73+
74+
zmq_close(frontend);
75+
zmq_close(backend);
76+
zmq_close(capture);
77+
78+
SWSS_LOG_ERR("Terminating xpub/xsub proxy");
79+
80+
return 0;
81+
}
82+
83+
84+
int
85+
eventd_server::capture_events()
86+
{
87+
/* clean any pre-existing cache */
88+
int ret = -1;
89+
90+
vector<strings>().swap(m_events);
91+
map<runtime_id_t, string>.swap(m_last_events);
92+
93+
RET_ON_ERR(m_capture != NULL, "capture sock is not initialized yet");
94+
95+
while(true) {
96+
zmq_msg_t msg;
97+
internal_event_t event;
98+
int more = 0;
99+
size_t more_size = sizeof (more);
100+
101+
{
102+
zmq_msg_t pat;
103+
zmq_msg_init(&pat);
104+
RET_ON_ERR(zmq_msg_recv(&pat, m_capture, 0) != -1,
105+
"Failed to capture pattern");
106+
zmq_msg_close(&pat);
107+
}
108+
109+
RET_ON_ERR(zmq_getsockopt (m_capture, ZMQ_RCVMORE, &more, &more_size) == 0,
110+
"Failed to get sockopt for capture sock");
111+
RET_ON_ERR(more, "Event data expected, but more is false");
112+
113+
zmq_msg_init(&msg);
114+
RET_ON_ERR(zmq_msg_recv(&msg, m_capture, 0) != -1,
115+
"Failed to read event data");
116+
117+
string s((const char *)zmq_msg_data(&msg), zmq_msg_size(&msg));
118+
zmq_msg_close(&msg);
119+
120+
deserialize(s, event);
121+
122+
m_last_events[event[EVENT_RUNTIME_ID]] = s;
123+
124+
try
125+
{
126+
m_events.push_back(s);
127+
}
128+
catch (exception& e)
129+
{
130+
stringstream ss;
131+
ss << e.what();
132+
SWSS_LOG_ERROR("Cache save event failed with %s events:size=%d",
133+
ss.str().c_str(), m_events.size());
134+
goto out;
135+
}
136+
}
137+
out:
138+
/* Destroy the service and exit the thread */
139+
close();
140+
return 0;
141+
}
142+
143+
144+
int
145+
eventd_server::eventd_service()
146+
{
147+
event_service service;
148+
149+
RET_ON_ERR(zproxy_service() == 0, "Failed to start zproxy_service");
150+
151+
RET_ON_ERR(service.init_server(m_ctx) == 0, "Failed to init service");
152+
153+
while(true) {
154+
int code, resp = -1;
155+
vector<events_cache_type_t> req_data, resp_data;
156+
157+
RET_ON_ERR(channel_read(code, data) == 0,
158+
"Failed to read request");
159+
160+
switch(code) {
161+
case EVENT_CACHE_START:
162+
if (m_capture != NULL) {
163+
resp_code = 1;
164+
break;
165+
}
166+
m_capture = zmq_socket(m_ctx, ZMQ_SUB);
167+
RET_ON_ERR(capture != NULL, "failing to get ZMQ_XSUB socket");
168+
169+
rc = zmq_connect(capture, get_config(CAPTURE_END_KEY));
170+
RET_ON_ERR(rc == 0, "Failing to bind PAIR to %s", get_config(PAIR_END_KEY));
171+
172+
rc = zmq_setsockopt(sub_read, ZMQ_SUBSCRIBE, "", 0);
173+
RET_ON_ERR(rc == 0, "Failing to ZMQ_SUBSCRIBE");
174+
175+
/* Kick off the service */
176+
m_thread_capture = thread(&eventd_server::capture_events, this);
177+
178+
resp_code = 0;
179+
break;
180+
181+
182+
case EVENT_CACHE_STOP:
183+
resp_code = 0;
184+
if (m_capture != NULL) {
185+
close(m_capture);
186+
m_capture = NULL;
187+
188+
/* Wait for thread to end */
189+
m_thread_capture.join();
190+
}
191+
break;
192+
193+
194+
case EVENT_CACHE_READ:
195+
resp_code = 0;
196+
197+
if (m_events.empty()) {
198+
for (last_events_t::iterator it = m_last_events.begin();
199+
it != m_last_events.end(); ++it) {
200+
m_events.push_back(it->second);
201+
}
202+
last_events_t().swap(m_last_events);
203+
}
204+
205+
int sz = m_events.size() < READ_SET_SIZE ? m_events.size() : READ_SET_SIZE;
206+
207+
auto it = std::next(m_events.begin(), sz);
208+
move(m_events.begin(), m_events.end(), back_inserter(resp_data));
209+
210+
if (sz == m_events.size()) {
211+
events_data_lst_t().swap(m_events);
212+
} else {
213+
m_events.erase(m_events.begin(), it);
214+
}
215+
break;
216+
217+
218+
case EVENT_ECHO:
219+
resp_code = 0;
220+
resp_data.swap(req_data);
221+
222+
default:
223+
SWSS_LOG_ERROR("Unexpected request: %d", code);
224+
assert(false);
225+
break;
226+
}
227+
RET_ON_ERR(channel_write(resp_code, resp_data) == 0,
228+
"Failed to write response back");
229+
}
230+
out:
231+
/* Breaks here on fatal failure */
232+
if (m_capture != NULL) {
233+
close(m_capture);
234+
m_capture = NULL;
235+
}
236+
close();
237+
m_thread_proxy.join();
238+
m_thread_capture.join();
239+
return 0;
240+
}
241+
242+
243+
244+
void eventd_server::close()
245+
{
246+
zmq_ctx_term(m_ctx); m_ctx = NULL;
247+
248+
}
249+

common/eventd.h

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
#include "events_service.h"
2+
3+
typedef map<runtime_id_t, events_cache_type_t> last_events_t;
4+
5+
class eventd_server {
6+
public:
7+
/* Creates the zmq context */
8+
eventd_server();
9+
10+
~eventd_server();
11+
12+
/*
13+
* Main eventd service loop that honors event_req_type_t
14+
*
15+
* For echo, it just echoes
16+
*
17+
* FOr cache start, create the SUB end of capture and kick off
18+
* capture_events thread. Upon cache stop command, close the handle
19+
* which will stop the caching thread with read failure.
20+
*
21+
* for cache read, returns the collected events as subset of
22+
* strings.
23+
*
24+
*/
25+
int eventd_service();
26+
27+
28+
/*
29+
* For any fatal failure, terminate the entire run across threads
30+
* by deleting the zmq context.
31+
*/
32+
void close();
33+
34+
private:
35+
/*
36+
* Started by eventd_service.
37+
* Creates XPUB & XSUB end points.
38+
* Bind the same
39+
* Create a PUB socket end point for capture and bind.
40+
* Call run_proxy method with sockets in a dedicated thread.
41+
* Thread runs forever until the zmq context is terminated.
42+
*/
43+
int zproxy_service();
44+
int zproxy_service_run(void *front, void *back, void *capture);
45+
46+
47+
/*
48+
* Capture/Cache service
49+
*
50+
* The service started in a dedicted thread upon demand.
51+
* It expects SUB end of capture created & connected to the PUB
52+
* capture end in zproxy service.
53+
*
54+
* It goes in a forever loop, until the zmq read fails, which will happen
55+
* if the capture/SUB end is closed. The stop cache service will close it,
56+
* while start cache service creates & connects.
57+
*
58+
* Hence this thread/function is active between cache start & stop.
59+
*
60+
* Each event is 2 parts. It drops the first part, which is
61+
* more for filtering events. It creates string from second part
62+
* and saves it.
63+
*
64+
* The string is the serialized version of internal_event_ref
65+
*
66+
* It keeps two sets of data
67+
* 1) List of all events received in vector in same order as received
68+
* 2) Map of last event from each runtime id
69+
*
70+
* We add to the vector as much as allowed by vector and as well
71+
* the available memory. When mem exhausts, just keep updating map
72+
* with last event from that sender.
73+
*
74+
* The sequence number in map will help assess the missed count.
75+
*
76+
* Thread is started upon creating SUB end of capture socket.
77+
*/
78+
int capture_events();
79+
80+
81+
private:
82+
void *m_ctx;
83+
84+
events_data_lst_t m_events;
85+
86+
last_events_t m_last_events;
87+
88+
void *m_capture;
89+
90+
91+
thread m_thread_proxy;
92+
thread m_thread_capture;
93+
};
94+
95+
96+
97+
98+
99+
100+
101+

common/events.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ EventPublisher::event_publish(const string tag, const event_params_t *params)
9191
}
9292
event_data[EVENT_RUNTIME_ID] = m_runtime_id;
9393

94-
map_to_zmsg(event_data, msg);
94+
RET_ON_ERR(map_to_zmsg(event_data, msg) == 0,
95+
"Failed to buildmsg data size=%d", event_data.size());
9596

9697
// Send the message
9798
// First part -- The event-source/pattern

0 commit comments

Comments
 (0)