Skip to content

Commit f4f3bfa

Browse files
liuh-80stepanblyschak
authored andcommitted
[202111] [select] break the select loop if interrupt_on_signal flag is set (sonic-net#624)
Signed-off-by: Stepan Blyschak <[email protected]>
1 parent 4a1edff commit f4f3bfa

File tree

6 files changed

+94
-21
lines changed

6 files changed

+94
-21
lines changed

common/configdb.h

+5-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,11 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
7676
self.pubsub = self.get_redis_client(self.db_name).pubsub()
7777
self.pubsub.psubscribe("__keyspace@{}__:*".format(self.get_dbid(self.db_name)))
7878
while True:
79-
item = self.pubsub.listen_message()
79+
item = self.pubsub.listen_message(interrupt_on_signal=True)
80+
if 'type' not in item:
81+
# When timeout or interrupted, item will not contains 'type'
82+
continue
83+
8084
if item['type'] == 'pmessage':
8185
key = item['channel'].split(':', 1)[1]
8286
try:

common/pubsub.cpp

+23-11
Original file line numberDiff line numberDiff line change
@@ -77,22 +77,31 @@ bool PubSub::hasCachedData()
7777
return m_keyspace_event_buffer.size() > 1;
7878
}
7979

80-
map<string, string> PubSub::get_message(double timeout)
80+
map<string, string> PubSub::get_message(double timeout, bool interrupt_on_signal)
8181
{
82-
map<string, string> ret;
82+
return get_message_internal(timeout, interrupt_on_signal).second;
83+
}
84+
85+
MessageResultPair PubSub::get_message_internal(double timeout, bool interrupt_on_signal)
86+
{
87+
MessageResultPair ret;
88+
8389
if (!m_subscribe)
8490
{
91+
ret.first = Select::ERROR;
8592
return ret;
8693
}
8794

8895
Selectable *selected;
89-
int rc = m_select.select(&selected, int(timeout));
96+
int rc = m_select.select(&selected, int(timeout), interrupt_on_signal);
97+
ret.first = rc;
9098
switch (rc)
9199
{
92100
case Select::ERROR:
93101
throw RedisError("Failed to select", m_subscribe->getContext());
94102

95103
case Select::TIMEOUT:
104+
case Select::SIGNALINT:
96105
return ret;
97106

98107
case Select::OBJECT:
@@ -110,26 +119,29 @@ map<string, string> PubSub::get_message(double timeout)
110119
}
111120

112121
auto message = event->getReply<RedisMessage>();
113-
ret["type"] = message.type;
114-
ret["pattern"] = message.pattern;
115-
ret["channel"] = message.channel;
116-
ret["data"] = message.data;
122+
ret.second["type"] = message.type;
123+
ret.second["pattern"] = message.pattern;
124+
ret.second["channel"] = message.channel;
125+
ret.second["data"] = message.data;
117126
return ret;
118127
}
119128

120129
// Note: it is not straightforward to implement redis-py PubSub.listen() directly in c++
121130
// due to the `yield` syntax, so we implement this function for blocking listen one message
122-
std::map<std::string, std::string> PubSub::listen_message()
131+
std::map<std::string, std::string> PubSub::listen_message(bool interrupt_on_signal)
123132
{
124133
const double GET_MESSAGE_INTERVAL = 600.0; // in seconds
134+
MessageResultPair ret;
125135
for (;;)
126136
{
127-
auto ret = get_message(GET_MESSAGE_INTERVAL);
128-
if (!ret.empty())
137+
ret = get_message_internal(GET_MESSAGE_INTERVAL, interrupt_on_signal);
138+
if (!ret.second.empty() || ret.first == Select::SIGNALINT)
129139
{
130-
return ret;
140+
break;
131141
}
132142
}
143+
144+
return ret.second;
133145
}
134146

135147
shared_ptr<RedisReply> PubSub::popEventBuffer()

common/pubsub.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,25 @@
11
#pragma once
22
#include <map>
33
#include <deque>
4+
#include <utility>
45

56
#include "dbconnector.h"
67
#include "select.h"
78
#include "redisselect.h"
89

910
namespace swss {
1011

12+
typedef std::pair<int, std::map<std::string, std::string> > MessageResultPair;
13+
1114
// This class is to emulate python redis-py class PubSub
1215
// After SWIG wrapping, it should be used in the same way
1316
class PubSub : protected RedisSelect
1417
{
1518
public:
1619
explicit PubSub(DBConnector *other);
1720

18-
std::map<std::string, std::string> get_message(double timeout = 0.0);
19-
std::map<std::string, std::string> listen_message();
21+
std::map<std::string, std::string> get_message(double timeout = 0.0, bool interrupt_on_signal = false);
22+
std::map<std::string, std::string> listen_message(bool interrupt_on_signal = false);
2023

2124
void psubscribe(const std::string &pattern);
2225
void punsubscribe(const std::string &pattern);
@@ -29,6 +32,7 @@ class PubSub : protected RedisSelect
2932
private:
3033
/* Pop keyspace event from event buffer. Caller should free resources. */
3134
std::shared_ptr<RedisReply> popEventBuffer();
35+
MessageResultPair get_message_internal(double timeout = 0.0, bool interrupt_on_signal = false);
3236

3337
DBConnector *m_parentConnector;
3438
Select m_select;

common/select.cpp

+23-5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <unistd.h>
1010
#include <string.h>
1111

12+
1213
using namespace std;
1314

1415
namespace swss {
@@ -87,20 +88,34 @@ void Select::addSelectables(vector<Selectable *> selectables)
8788
}
8889
}
8990

90-
int Select::poll_descriptors(Selectable **c, unsigned int timeout)
91+
int Select::poll_descriptors(Selectable **c, unsigned int timeout, bool interrupt_on_signal = false)
9192
{
9293
int sz_selectables = static_cast<int>(m_objects.size());
9394
std::vector<struct epoll_event> events(sz_selectables);
9495
int ret;
9596

96-
do
97+
while(true)
9798
{
9899
ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout);
100+
// on signal interrupt check if we need to return
101+
if (ret == -1 && errno == EINTR)
102+
{
103+
if (interrupt_on_signal)
104+
{
105+
return Select::SIGNALINT;
106+
}
107+
}
108+
// on all other errors break the loop
109+
else
110+
{
111+
break;
112+
}
99113
}
100-
while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal
101114

102115
if (ret < 0)
116+
{
103117
return Select::ERROR;
118+
}
104119

105120
for (int i = 0; i < ret; ++i)
106121
{
@@ -148,7 +163,7 @@ int Select::poll_descriptors(Selectable **c, unsigned int timeout)
148163
return Select::TIMEOUT;
149164
}
150165

151-
int Select::select(Selectable **c, int timeout)
166+
int Select::select(Selectable **c, int timeout, bool interrupt_on_signal)
152167
{
153168
SWSS_LOG_ENTER();
154169

@@ -164,7 +179,7 @@ int Select::select(Selectable **c, int timeout)
164179
return ret;
165180

166181
/* wait for data */
167-
ret = poll_descriptors(c, timeout);
182+
ret = poll_descriptors(c, timeout, interrupt_on_signal);
168183

169184
return ret;
170185

@@ -190,6 +205,9 @@ std::string Select::resultToString(int result)
190205
case swss::Select::TIMEOUT:
191206
return "TIMEOUT";
192207

208+
case swss::Select::SIGNALINT:
209+
return "SIGNALINT";
210+
193211
default:
194212
SWSS_LOG_WARN("unknown select result: %d", result);
195213
return "UNKNOWN";

common/select.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ class Select
3030
OBJECT = 0,
3131
ERROR = 1,
3232
TIMEOUT = 2,
33+
SIGNALINT = 3,// Read operation interrupted by a signal
3334
};
3435

35-
int select(Selectable **c, int timeout = -1);
36+
int select(Selectable **c, int timeout = -1, bool interrupt_on_signal = false);
3637
bool isQueueEmpty();
3738

3839
/**
@@ -65,7 +66,7 @@ class Select
6566
}
6667
};
6768

68-
int poll_descriptors(Selectable **c, unsigned int timeout);
69+
int poll_descriptors(Selectable **c, unsigned int timeout, bool interrupt_on_signal);
6970

7071
int m_epoll_fd;
7172
std::unordered_map<int, Selectable *> m_objects;

tests/test_signalhandler_ut.py

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import signal
2+
import os
3+
import pytest
4+
import multiprocessing
5+
import time
6+
from swsscommon import swsscommon
7+
8+
def test_config_db_listen_while_signal_received():
9+
""" Test performs ConfigDBConnector.listen() while signal is received,
10+
checks that the listen() call is interrupted and the regular KeyboardInterrupt is raised.
11+
"""
12+
c=swsscommon.ConfigDBConnector()
13+
c.subscribe('A', lambda a: None)
14+
c.connect(wait_for_init=False)
15+
event = multiprocessing.Event()
16+
17+
def signal_handler(signum, frame):
18+
event.set()
19+
sys.exit(0)
20+
21+
signal.signal(signal.SIGUSR1, signal_handler)
22+
23+
def listen():
24+
c.listen()
25+
26+
thr = multiprocessing.Process(target=listen)
27+
thr.start()
28+
29+
time.sleep(5)
30+
os.kill(thr.pid, signal.SIGUSR1)
31+
32+
thr.join()
33+
34+
assert event.is_set()

0 commit comments

Comments
 (0)