Skip to content

Commit af80c12

Browse files
stepanblyschakyxieca
authored andcommitted
[select] break the select loop if interrupt_on_signal flag is set (#624)
1 parent 749cd6f commit af80c12

10 files changed

+66
-167
lines changed

common/Makefile.am

-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ libswsscommon_la_SOURCES = \
4545
select.cpp \
4646
selectableevent.cpp \
4747
selectabletimer.cpp \
48-
signalhandlerhelper.cpp \
4948
consumertable.cpp \
5049
consumertablebase.cpp \
5150
consumerstatetable.cpp \

common/configdb.h

+5-5
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
7676
## Start listen Redis keyspace event. Pass a callback function to `init` to handle initial table data.
7777
self.pubsub = self.get_redis_client(self.db_name).pubsub()
7878
self.pubsub.psubscribe("__keyspace@{}__:*".format(self.get_dbid(self.db_name)))
79-
79+
8080
# Build a cache of data for all subscribed tables that will recieve the initial table data so we dont send duplicate event notifications
8181
init_data = {tbl: self.get_table(tbl) for tbl in self.handlers if init_data_handler or self.fire_init_data[tbl]}
8282

@@ -88,16 +88,16 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
8888
return False
8989
return True
9090

91-
init_callback_data = {tbl: data for tbl, data in init_data.items() if load_data(tbl, data)}
91+
init_callback_data = {tbl: data for tbl, data in init_data.items() if load_data(tbl, data)}
9292

9393
# Pass all initial data that we DID NOT send as updates to handlers through the init callback if provided by caller
9494
if init_data_handler:
9595
init_data_handler(init_callback_data)
9696

97-
while not SignalHandlerHelper.checkSignal(SIGNAL_INT):
98-
item = self.pubsub.listen_message()
97+
while True:
98+
item = self.pubsub.listen_message(interrupt_on_signal=True)
9999
if 'type' not in item:
100-
# When timeout or cancelled, item will not contains 'type'
100+
# When timeout or interrupted, item will not contains 'type'
101101
continue
102102

103103
if item['type'] == 'pmessage':

common/pubsub.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,12 @@ bool PubSub::hasCachedData()
7777
return m_keyspace_event_buffer.size() > 1;
7878
}
7979

80-
map<string, string> PubSub::get_message(double timeout)
80+
map<string, string> PubSub::get_message(double timeout, bool interrupt_on_signal)
8181
{
82-
return get_message_internal(timeout).second;
82+
return get_message_internal(timeout, interrupt_on_signal).second;
8383
}
8484

85-
MessageResultPair PubSub::get_message_internal(double timeout)
85+
MessageResultPair PubSub::get_message_internal(double timeout, bool interrupt_on_signal)
8686
{
8787
MessageResultPair ret;
8888

@@ -93,7 +93,7 @@ MessageResultPair PubSub::get_message_internal(double timeout)
9393
}
9494

9595
Selectable *selected;
96-
int rc = m_select.select(&selected, int(timeout));
96+
int rc = m_select.select(&selected, int(timeout), interrupt_on_signal);
9797
ret.first = rc;
9898
switch (rc)
9999
{
@@ -128,13 +128,13 @@ MessageResultPair PubSub::get_message_internal(double timeout)
128128

129129
// Note: it is not straightforward to implement redis-py PubSub.listen() directly in c++
130130
// due to the `yield` syntax, so we implement this function for blocking listen one message
131-
std::map<std::string, std::string> PubSub::listen_message()
131+
std::map<std::string, std::string> PubSub::listen_message(bool interrupt_on_signal)
132132
{
133133
const double GET_MESSAGE_INTERVAL = 600.0; // in seconds
134134
MessageResultPair ret;
135135
for (;;)
136136
{
137-
ret = get_message_internal(GET_MESSAGE_INTERVAL);
137+
ret = get_message_internal(GET_MESSAGE_INTERVAL, interrupt_on_signal);
138138
if (!ret.second.empty() || ret.first == Select::SIGNALINT)
139139
{
140140
break;

common/pubsub.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ class PubSub : protected RedisSelect
1818
public:
1919
explicit PubSub(DBConnector *other);
2020

21-
std::map<std::string, std::string> get_message(double timeout = 0.0);
22-
std::map<std::string, std::string> listen_message();
21+
std::map<std::string, std::string> get_message(double timeout = 0.0, bool interrupt_on_signal = false);
22+
std::map<std::string, std::string> listen_message(bool interrupt_on_signal = false);
2323

2424
void psubscribe(const std::string &pattern);
2525
void punsubscribe(const std::string &pattern);
@@ -32,7 +32,7 @@ class PubSub : protected RedisSelect
3232
private:
3333
/* Pop keyspace event from event buffer. Caller should free resources. */
3434
std::shared_ptr<RedisReply> popEventBuffer();
35-
MessageResultPair get_message_internal(double timeout = 0.0);
35+
MessageResultPair get_message_internal(double timeout = 0.0, bool interrupt_on_signal = false);
3636

3737
DBConnector *m_parentConnector;
3838
Select m_select;

common/select.cpp

+19-12
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#include "common/selectable.h"
22
#include "common/logger.h"
33
#include "common/select.h"
4-
#include "common/signalhandlerhelper.h"
54
#include <algorithm>
65
#include <stdio.h>
76
#include <sys/time.h>
@@ -89,26 +88,34 @@ void Select::addSelectables(vector<Selectable *> selectables)
8988
}
9089
}
9190

92-
int Select::poll_descriptors(Selectable **c, unsigned int timeout)
91+
int Select::poll_descriptors(Selectable **c, unsigned int timeout, bool interrupt_on_signal = false)
9392
{
9493
int sz_selectables = static_cast<int>(m_objects.size());
9594
std::vector<struct epoll_event> events(sz_selectables);
9695
int ret;
9796

98-
do
97+
while(true)
9998
{
10099
ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout);
101-
}
102-
while(ret == -1 && errno == EINTR && !SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT)); // Retry the select if the process was interrupted by a signal
103-
104-
if (SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT))
105-
{
106-
// Return if the epoll_wait was interrupted by SIGTERM
107-
return Select::SIGNALINT;
100+
// on signal interrupt check if we need to return
101+
if (ret == -1 && errno == EINTR)
102+
{
103+
if (interrupt_on_signal)
104+
{
105+
return Select::SIGNALINT;
106+
}
107+
}
108+
// on all other errors break the loop
109+
else
110+
{
111+
break;
112+
}
108113
}
109114

110115
if (ret < 0)
116+
{
111117
return Select::ERROR;
118+
}
112119

113120
for (int i = 0; i < ret; ++i)
114121
{
@@ -156,7 +163,7 @@ int Select::poll_descriptors(Selectable **c, unsigned int timeout)
156163
return Select::TIMEOUT;
157164
}
158165

159-
int Select::select(Selectable **c, int timeout)
166+
int Select::select(Selectable **c, int timeout, bool interrupt_on_signal)
160167
{
161168
SWSS_LOG_ENTER();
162169

@@ -172,7 +179,7 @@ int Select::select(Selectable **c, int timeout)
172179
return ret;
173180

174181
/* wait for data */
175-
ret = poll_descriptors(c, timeout);
182+
ret = poll_descriptors(c, timeout, interrupt_on_signal);
176183

177184
return ret;
178185

common/select.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ class Select
3030
OBJECT = 0,
3131
ERROR = 1,
3232
TIMEOUT = 2,
33-
SIGNALINT = 3,// Read operation interrupted by SIGINT
33+
SIGNALINT = 3,// Read operation interrupted by a signal
3434
};
3535

36-
int select(Selectable **c, int timeout = -1);
36+
int select(Selectable **c, int timeout = -1, bool interrupt_on_signal = false);
3737
bool isQueueEmpty();
3838

3939
/**
@@ -66,7 +66,7 @@ class Select
6666
}
6767
};
6868

69-
int poll_descriptors(Selectable **c, unsigned int timeout);
69+
int poll_descriptors(Selectable **c, unsigned int timeout, bool interrupt_on_signal);
7070

7171
int m_epoll_fd;
7272
std::unordered_map<int, Selectable *> m_objects;

common/signalhandlerhelper.cpp

-70
This file was deleted.

common/signalhandlerhelper.h

-36
This file was deleted.

pyext/swsscommon.i

-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
#include "pubsub.h"
2020
#include "select.h"
2121
#include "selectable.h"
22-
#include "signalhandlerhelper.h"
2322
#include "rediscommand.h"
2423
#include "table.h"
2524
#include "countertable.h"
@@ -156,7 +155,6 @@ T castSelectableObj(swss::Selectable *temp)
156155
%include "pubsub.h"
157156
%include "selectable.h"
158157
%include "select.h"
159-
%include "signalhandlerhelper.h"
160158
%include "rediscommand.h"
161159
%include "redispipeline.h"
162160
%include "redisselect.h"

tests/test_signalhandler_ut.py

+30-29
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,34 @@
11
import signal
22
import os
33
import pytest
4+
import multiprocessing
5+
import time
46
from swsscommon import swsscommon
5-
from swsscommon.swsscommon import SignalHandlerHelper
6-
7-
def dummy_signal_handler(signum, stack):
8-
# ignore signal so UT will not break
9-
pass
10-
11-
def test_SignalHandler():
12-
signal.signal(signal.SIGUSR1, dummy_signal_handler)
13-
14-
# Register SIGUSER1
15-
SignalHandlerHelper.registerSignalHandler(signal.SIGUSR1)
16-
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
17-
assert happened == False
18-
19-
# trigger SIGUSER manually
20-
os.kill(os.getpid(), signal.SIGUSR1)
21-
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
22-
assert happened == True
23-
24-
# Reset signal
25-
SignalHandlerHelper.resetSignal(signal.SIGUSR1)
26-
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
27-
assert happened == False
28-
29-
# un-register signal handler
30-
SignalHandlerHelper.restoreSignalHandler(signal.SIGUSR1)
31-
os.kill(os.getpid(), signal.SIGUSR1)
32-
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
33-
assert happened == False
7+
8+
def test_config_db_listen_while_signal_received():
9+
""" Test performs ConfigDBConnector.listen() while signal is received,
10+
checks that the listen() call is interrupted and the regular KeyboardInterrupt is raised.
11+
"""
12+
c=swsscommon.ConfigDBConnector()
13+
c.subscribe('A', lambda a: None)
14+
c.connect(wait_for_init=False)
15+
event = multiprocessing.Event()
16+
17+
def signal_handler(signum, frame):
18+
event.set()
19+
sys.exit(0)
20+
21+
signal.signal(signal.SIGUSR1, signal_handler)
22+
23+
def listen():
24+
c.listen()
25+
26+
thr = multiprocessing.Process(target=listen)
27+
thr.start()
28+
29+
time.sleep(5)
30+
os.kill(thr.pid, signal.SIGUSR1)
31+
32+
thr.join()
33+
34+
assert event.is_set()

0 commit comments

Comments
 (0)