From 44c1e4ec2f9a4a80f78034924150185439ae9884 Mon Sep 17 00:00:00 2001 From: Stepan Blyschak Date: Tue, 24 May 2022 13:00:28 +0300 Subject: [PATCH 1/5] [select] break the select loop if interrupt_on_signal flag is set Signed-off-by: Stepan Blyschak --- common/configdb.h | 8 ++++---- common/pubsub.cpp | 12 ++++++------ common/pubsub.h | 6 +++--- common/select.cpp | 30 +++++++++++++++++++----------- common/select.h | 4 ++-- tests/test_configdb.py | 26 ++++++++++++++++++++++++++ 6 files changed, 60 insertions(+), 26 deletions(-) create mode 100755 tests/test_configdb.py diff --git a/common/configdb.h b/common/configdb.h index e1f82b454..3875b7f64 100644 --- a/common/configdb.h +++ b/common/configdb.h @@ -76,7 +76,7 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native ## Start listen Redis keyspace event. Pass a callback function to `init` to handle initial table data. self.pubsub = self.get_redis_client(self.db_name).pubsub() self.pubsub.psubscribe("__keyspace@{}__:*".format(self.get_dbid(self.db_name))) - + # Build a cache of data for all subscribed tables that will recieve the initial table data so we dont send duplicate event notifications init_data = {tbl: self.get_table(tbl) for tbl in self.handlers if init_data_handler or self.fire_init_data[tbl]} @@ -88,16 +88,16 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native return False return True - init_callback_data = {tbl: data for tbl, data in init_data.items() if load_data(tbl, data)} + init_callback_data = {tbl: data for tbl, data in init_data.items() if load_data(tbl, data)} # Pass all initial data that we DID NOT send as updates to handlers through the init callback if provided by caller if init_data_handler: init_data_handler(init_callback_data) while not SignalHandlerHelper.checkSignal(SIGNAL_INT): - item = self.pubsub.listen_message() + item = self.pubsub.listen_message(interrupt_on_signal=True) if 'type' not in item: - # When timeout or cancelled, item will not contains 'type' + # When timeout or cancelled, item will not contains 'type' continue if item['type'] == 'pmessage': diff --git a/common/pubsub.cpp b/common/pubsub.cpp index 0b5fb808c..4eb48def0 100644 --- a/common/pubsub.cpp +++ b/common/pubsub.cpp @@ -77,12 +77,12 @@ bool PubSub::hasCachedData() return m_keyspace_event_buffer.size() > 1; } -map PubSub::get_message(double timeout) +map PubSub::get_message(double timeout, bool interrupt_on_signal) { - return get_message_internal(timeout).second; + return get_message_internal(timeout, interrupt_on_signal).second; } -MessageResultPair PubSub::get_message_internal(double timeout) +MessageResultPair PubSub::get_message_internal(double timeout, bool interrupt_on_signal) { MessageResultPair ret; @@ -93,7 +93,7 @@ MessageResultPair PubSub::get_message_internal(double timeout) } Selectable *selected; - int rc = m_select.select(&selected, int(timeout)); + int rc = m_select.select(&selected, int(timeout), interrupt_on_signal); ret.first = rc; switch (rc) { @@ -128,13 +128,13 @@ MessageResultPair PubSub::get_message_internal(double timeout) // Note: it is not straightforward to implement redis-py PubSub.listen() directly in c++ // due to the `yield` syntax, so we implement this function for blocking listen one message -std::map PubSub::listen_message() +std::map PubSub::listen_message(bool interrupt_on_signal) { const double GET_MESSAGE_INTERVAL = 600.0; // in seconds MessageResultPair ret; for (;;) { - ret = get_message_internal(GET_MESSAGE_INTERVAL); + ret = get_message_internal(GET_MESSAGE_INTERVAL, interrupt_on_signal); if (!ret.second.empty() || ret.first == Select::SIGNALINT) { break; diff --git a/common/pubsub.h b/common/pubsub.h index 05e8849ad..567f83114 100644 --- a/common/pubsub.h +++ b/common/pubsub.h @@ -18,8 +18,8 @@ class PubSub : protected RedisSelect public: explicit PubSub(DBConnector *other); - std::map get_message(double timeout = 0.0); - std::map listen_message(); + std::map get_message(double timeout = 0.0, bool interrupt_on_signal = false); + std::map listen_message(bool interrupt_on_signal = false); void psubscribe(const std::string &pattern); void punsubscribe(const std::string &pattern); @@ -32,7 +32,7 @@ class PubSub : protected RedisSelect private: /* Pop keyspace event from event buffer. Caller should free resources. */ std::shared_ptr popEventBuffer(); - MessageResultPair get_message_internal(double timeout = 0.0); + MessageResultPair get_message_internal(double timeout = 0.0, bool interrupt_on_signal = false); DBConnector *m_parentConnector; Select m_select; diff --git a/common/select.cpp b/common/select.cpp index 9150af874..43d268a27 100644 --- a/common/select.cpp +++ b/common/select.cpp @@ -89,26 +89,34 @@ void Select::addSelectables(vector selectables) } } -int Select::poll_descriptors(Selectable **c, unsigned int timeout) +int Select::poll_descriptors(Selectable **c, unsigned int timeout, bool interrupt_on_signal = false) { int sz_selectables = static_cast(m_objects.size()); std::vector events(sz_selectables); int ret; - do + while(true) { ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout); - } - while(ret == -1 && errno == EINTR && !SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT)); // Retry the select if the process was interrupted by a signal - - if (SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT)) - { - // Return if the epoll_wait was interrupted by SIGTERM - return Select::SIGNALINT; + // on signal interrupt check if we need to return + if (ret == -1 && errno == EINTR) + { + if (interrupt_on_signal || SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT)) + { + return Select::SIGNALINT; + } + } + // on all other errors break the loop + else + { + break; + } } if (ret < 0) + { return Select::ERROR; + } for (int i = 0; i < ret; ++i) { @@ -156,7 +164,7 @@ int Select::poll_descriptors(Selectable **c, unsigned int timeout) return Select::TIMEOUT; } -int Select::select(Selectable **c, int timeout) +int Select::select(Selectable **c, int timeout, bool interrupt_on_signal) { SWSS_LOG_ENTER(); @@ -172,7 +180,7 @@ int Select::select(Selectable **c, int timeout) return ret; /* wait for data */ - ret = poll_descriptors(c, timeout); + ret = poll_descriptors(c, timeout, interrupt_on_signal); return ret; diff --git a/common/select.h b/common/select.h index 2d41721aa..2a8934873 100644 --- a/common/select.h +++ b/common/select.h @@ -33,7 +33,7 @@ class Select SIGNALINT = 3,// Read operation interrupted by SIGINT }; - int select(Selectable **c, int timeout = -1); + int select(Selectable **c, int timeout = -1, bool interrupt_on_signal = false); bool isQueueEmpty(); /** @@ -66,7 +66,7 @@ class Select } }; - int poll_descriptors(Selectable **c, unsigned int timeout); + int poll_descriptors(Selectable **c, unsigned int timeout, bool interrupt_on_signal); int m_epoll_fd; std::unordered_map m_objects; diff --git a/tests/test_configdb.py b/tests/test_configdb.py new file mode 100755 index 000000000..3b1c25f1a --- /dev/null +++ b/tests/test_configdb.py @@ -0,0 +1,26 @@ +import signal +import os +import pytest +import threading +import time +from swsscommon import swsscommon + +def test_config_db_listen_while_signal_received(): + """ Test performs ConfigDBConnector.listen() while signal is received, + checks that the listen() call is interrupted and the regular KeyboardInterrupt is raised. + """ + c=swsscommon.ConfigDBConnector() + c.subscribe('A', lambda a: None) + c.connect(wait_for_init=False) + + def deferred_sigint(): + time.sleep(10) + os.kill(os.getpid(), signal.SIGINT) + + thr = threading.Thread(target=deferred_sigint) + thr.start() + + with pytest.raises(KeyboardInterrupt): + c.listen() + + thr.join() From 95e3fd2ac683adf70e761569d57bc20cdc8b9156 Mon Sep 17 00:00:00 2001 From: Stepan Blyschak Date: Mon, 30 May 2022 00:30:06 +0300 Subject: [PATCH 2/5] make test work on python2 Signed-off-by: Stepan Blyschak --- tests/test_configdb.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/tests/test_configdb.py b/tests/test_configdb.py index 3b1c25f1a..003019065 100755 --- a/tests/test_configdb.py +++ b/tests/test_configdb.py @@ -1,7 +1,8 @@ import signal import os +import sys import pytest -import threading +import multiprocessing import time from swsscommon import swsscommon @@ -12,15 +13,23 @@ def test_config_db_listen_while_signal_received(): c=swsscommon.ConfigDBConnector() c.subscribe('A', lambda a: None) c.connect(wait_for_init=False) + event = multiprocessing.Event() - def deferred_sigint(): - time.sleep(10) - os.kill(os.getpid(), signal.SIGINT) + def signal_handler(signum, frame): + event.set() + sys.exit(0) - thr = threading.Thread(target=deferred_sigint) - thr.start() + signal.signal(signal.SIGUSR1, signal_handler) - with pytest.raises(KeyboardInterrupt): + def listen(): c.listen() + thr = multiprocessing.Process(target=listen) + thr.start() + + time.sleep(5) + os.kill(thr.pid, signal.SIGUSR1) + thr.join() + + assert event.is_set() From 84f29233a96a9a7f3c67f75155b18a9d732a7c44 Mon Sep 17 00:00:00 2001 From: Stepan Blyschak Date: Mon, 30 May 2022 00:48:16 +0300 Subject: [PATCH 3/5] move the test Signed-off-by: Stepan Blyschak --- tests/test_configdb.py | 35 -------------------------------- tests/test_signalhandler_ut.py | 37 +++++++++++++++++++++++++++++++--- 2 files changed, 34 insertions(+), 38 deletions(-) delete mode 100755 tests/test_configdb.py diff --git a/tests/test_configdb.py b/tests/test_configdb.py deleted file mode 100755 index 003019065..000000000 --- a/tests/test_configdb.py +++ /dev/null @@ -1,35 +0,0 @@ -import signal -import os -import sys -import pytest -import multiprocessing -import time -from swsscommon import swsscommon - -def test_config_db_listen_while_signal_received(): - """ Test performs ConfigDBConnector.listen() while signal is received, - checks that the listen() call is interrupted and the regular KeyboardInterrupt is raised. - """ - c=swsscommon.ConfigDBConnector() - c.subscribe('A', lambda a: None) - c.connect(wait_for_init=False) - event = multiprocessing.Event() - - def signal_handler(signum, frame): - event.set() - sys.exit(0) - - signal.signal(signal.SIGUSR1, signal_handler) - - def listen(): - c.listen() - - thr = multiprocessing.Process(target=listen) - thr.start() - - time.sleep(5) - os.kill(thr.pid, signal.SIGUSR1) - - thr.join() - - assert event.is_set() diff --git a/tests/test_signalhandler_ut.py b/tests/test_signalhandler_ut.py index bed883b5d..316f2f421 100755 --- a/tests/test_signalhandler_ut.py +++ b/tests/test_signalhandler_ut.py @@ -1,6 +1,8 @@ import signal import os import pytest +import multiprocessing +import time from swsscommon import swsscommon from swsscommon.swsscommon import SignalHandlerHelper @@ -20,14 +22,43 @@ def test_SignalHandler(): os.kill(os.getpid(), signal.SIGUSR1) happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1) assert happened == True - + # Reset signal SignalHandlerHelper.resetSignal(signal.SIGUSR1) happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1) assert happened == False - + # un-register signal handler SignalHandlerHelper.restoreSignalHandler(signal.SIGUSR1) os.kill(os.getpid(), signal.SIGUSR1) happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1) - assert happened == False \ No newline at end of file + assert happened == False + + +def test_config_db_listen_while_signal_received(): + """ Test performs ConfigDBConnector.listen() while signal is received, + checks that the listen() call is interrupted and the regular KeyboardInterrupt is raised. + """ + c=swsscommon.ConfigDBConnector() + c.subscribe('A', lambda a: None) + c.connect(wait_for_init=False) + event = multiprocessing.Event() + + def signal_handler(signum, frame): + event.set() + sys.exit(0) + + signal.signal(signal.SIGUSR1, signal_handler) + + def listen(): + c.listen() + + thr = multiprocessing.Process(target=listen) + thr.start() + + time.sleep(5) + os.kill(thr.pid, signal.SIGUSR1) + + thr.join() + + assert event.is_set() From 612b12081e056ffdad3e2882b5dd639aba613f85 Mon Sep 17 00:00:00 2001 From: Stepan Blyschak Date: Wed, 15 Jun 2022 17:00:38 +0300 Subject: [PATCH 4/5] handle review comments Signed-off-by: Stepan Blyschak --- common/configdb.h | 4 +- common/select.cpp | 3 +- common/select.h | 2 +- common/signalhandlerhelper.cpp | 70 ---------------------------------- common/signalhandlerhelper.h | 36 ----------------- pyext/swsscommon.i | 2 - tests/test_signalhandler_ut.py | 30 --------------- 7 files changed, 4 insertions(+), 143 deletions(-) delete mode 100644 common/signalhandlerhelper.cpp delete mode 100644 common/signalhandlerhelper.h diff --git a/common/configdb.h b/common/configdb.h index 3875b7f64..bad029bd5 100644 --- a/common/configdb.h +++ b/common/configdb.h @@ -94,10 +94,10 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native if init_data_handler: init_data_handler(init_callback_data) - while not SignalHandlerHelper.checkSignal(SIGNAL_INT): + while True: item = self.pubsub.listen_message(interrupt_on_signal=True) if 'type' not in item: - # When timeout or cancelled, item will not contains 'type' + # When timeout or interrupted, item will not contains 'type' continue if item['type'] == 'pmessage': diff --git a/common/select.cpp b/common/select.cpp index 43d268a27..61b5a06a8 100644 --- a/common/select.cpp +++ b/common/select.cpp @@ -1,7 +1,6 @@ #include "common/selectable.h" #include "common/logger.h" #include "common/select.h" -#include "common/signalhandlerhelper.h" #include #include #include @@ -101,7 +100,7 @@ int Select::poll_descriptors(Selectable **c, unsigned int timeout, bool interrup // on signal interrupt check if we need to return if (ret == -1 && errno == EINTR) { - if (interrupt_on_signal || SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT)) + if (interrupt_on_signal) { return Select::SIGNALINT; } diff --git a/common/select.h b/common/select.h index 2a8934873..b658729f3 100644 --- a/common/select.h +++ b/common/select.h @@ -30,7 +30,7 @@ class Select OBJECT = 0, ERROR = 1, TIMEOUT = 2, - SIGNALINT = 3,// Read operation interrupted by SIGINT + SIGNALINT = 3,// Read operation interrupted by a signal }; int select(Selectable **c, int timeout = -1, bool interrupt_on_signal = false); diff --git a/common/signalhandlerhelper.cpp b/common/signalhandlerhelper.cpp deleted file mode 100644 index 0f1b0f36c..000000000 --- a/common/signalhandlerhelper.cpp +++ /dev/null @@ -1,70 +0,0 @@ -#include -#include "common/logger.h" -#include "signalhandlerhelper.h" - -using namespace swss; - -std::map SignalHandlerHelper::m_signalStatusMapping; -std::map SignalHandlerHelper::m_sigActionMapping; - -void SignalHandlerHelper::registerSignalHandler(int signalNumber) -{ - auto result = m_sigActionMapping.find(signalNumber); - if (result != m_sigActionMapping.end()) - { - // signal action already registered - SWSS_LOG_WARN("sigaction for %d already registered.", signalNumber); - return; - } - - m_signalStatusMapping[signalNumber] = false; - - SigActionPair sig_action_pair; - auto *new_action = &sig_action_pair.first; - auto *old_action = &sig_action_pair.second; - - new_action->sa_handler = SignalHandlerHelper::onSignal; - sigemptyset(&new_action->sa_mask); - new_action->sa_flags = 0; - - // always replace old action even old action is ignore signal - sigaction(signalNumber, new_action, old_action); - - m_sigActionMapping[signalNumber] = sig_action_pair; -} - -void SignalHandlerHelper::restoreSignalHandler(int signalNumber) -{ - auto result = m_sigActionMapping.find(signalNumber); - if (result == m_sigActionMapping.end()) - { - // signal action does not registered - SWSS_LOG_WARN("sigaction for %d does not registered.",signalNumber); - return; - } - - auto *old_action = &result->second.second; - - sigaction(signalNumber, old_action, NULL); -} - -void SignalHandlerHelper::onSignal(int signalNumber) -{ - m_signalStatusMapping[signalNumber] = true; -} - -bool SignalHandlerHelper::checkSignal(int signalNumber) -{ - auto result = m_signalStatusMapping.find(signalNumber); - if (result != m_signalStatusMapping.end()) - { - return result->second; - } - - return false; -} - -void SignalHandlerHelper::resetSignal(int signalNumber) -{ - m_signalStatusMapping[signalNumber] = false; -} \ No newline at end of file diff --git a/common/signalhandlerhelper.h b/common/signalhandlerhelper.h deleted file mode 100644 index ee7633aaa..000000000 --- a/common/signalhandlerhelper.h +++ /dev/null @@ -1,36 +0,0 @@ -#pragma once -#include -#include - -namespace swss { - -typedef std::pair SigActionPair; - -// Define signal ID enum for python -enum Signals -{ - SIGNAL_TERM = SIGTERM, - SIGNAL_INT = SIGINT -}; - -/* - SignalHandlerHelper class provide a native signal handler. - Python signal handler have following issue: - A long-running calculation implemented purely in C (such as regular expression matching on a large body of text) may run uninterrupted for an arbitrary amount of time, regardless of any signals received. The Python signal handlers will be called when the calculation finishes. - For more information, please check: https://docs.python.org/3/library/signal.html -*/ -class SignalHandlerHelper -{ -public: - static void registerSignalHandler(int signalNumber); - static void restoreSignalHandler(int signalNumber); - static void onSignal(int signalNumber); - static bool checkSignal(int signalNumber); - static void resetSignal(int signalNumber); - -private: - static std::map m_signalStatusMapping; - static std::map m_sigActionMapping; -}; - -} \ No newline at end of file diff --git a/pyext/swsscommon.i b/pyext/swsscommon.i index f3b15cd22..ebf3e65fd 100644 --- a/pyext/swsscommon.i +++ b/pyext/swsscommon.i @@ -19,7 +19,6 @@ #include "pubsub.h" #include "select.h" #include "selectable.h" -#include "signalhandlerhelper.h" #include "rediscommand.h" #include "table.h" #include "redispipeline.h" @@ -155,7 +154,6 @@ T castSelectableObj(swss::Selectable *temp) %include "pubsub.h" %include "selectable.h" %include "select.h" -%include "signalhandlerhelper.h" %include "rediscommand.h" %include "redispipeline.h" %include "redisselect.h" diff --git a/tests/test_signalhandler_ut.py b/tests/test_signalhandler_ut.py index 316f2f421..e5dc32840 100755 --- a/tests/test_signalhandler_ut.py +++ b/tests/test_signalhandler_ut.py @@ -4,36 +4,6 @@ import multiprocessing import time from swsscommon import swsscommon -from swsscommon.swsscommon import SignalHandlerHelper - -def dummy_signal_handler(signum, stack): - # ignore signal so UT will not break - pass - -def test_SignalHandler(): - signal.signal(signal.SIGUSR1, dummy_signal_handler) - - # Register SIGUSER1 - SignalHandlerHelper.registerSignalHandler(signal.SIGUSR1) - happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1) - assert happened == False - - # trigger SIGUSER manually - os.kill(os.getpid(), signal.SIGUSR1) - happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1) - assert happened == True - - # Reset signal - SignalHandlerHelper.resetSignal(signal.SIGUSR1) - happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1) - assert happened == False - - # un-register signal handler - SignalHandlerHelper.restoreSignalHandler(signal.SIGUSR1) - os.kill(os.getpid(), signal.SIGUSR1) - happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1) - assert happened == False - def test_config_db_listen_while_signal_received(): """ Test performs ConfigDBConnector.listen() while signal is received, From 76a25b20acd04f3051b30d14cd3ade2ecc89c69b Mon Sep 17 00:00:00 2001 From: Stepan Blyschak Date: Wed, 15 Jun 2022 17:17:19 +0300 Subject: [PATCH 5/5] remove signalhandlerhelper.cpp Signed-off-by: Stepan Blyschak --- common/Makefile.am | 1 - 1 file changed, 1 deletion(-) diff --git a/common/Makefile.am b/common/Makefile.am index 53f8c87ad..4c400556d 100644 --- a/common/Makefile.am +++ b/common/Makefile.am @@ -44,7 +44,6 @@ libswsscommon_la_SOURCES = \ select.cpp \ selectableevent.cpp \ selectabletimer.cpp \ - signalhandlerhelper.cpp \ consumertable.cpp \ consumertablebase.cpp \ consumerstatetable.cpp \