Skip to content

[select] break the select loop if interrupt_on_signal flag is set #624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ libswsscommon_la_SOURCES = \
select.cpp \
selectableevent.cpp \
selectabletimer.cpp \
signalhandlerhelper.cpp \
consumertable.cpp \
consumertablebase.cpp \
consumerstatetable.cpp \
Expand Down
10 changes: 5 additions & 5 deletions common/configdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
## Start listen Redis keyspace event. Pass a callback function to `init` to handle initial table data.
self.pubsub = self.get_redis_client(self.db_name).pubsub()
self.pubsub.psubscribe("__keyspace@{}__:*".format(self.get_dbid(self.db_name)))

# Build a cache of data for all subscribed tables that will recieve the initial table data so we dont send duplicate event notifications
init_data = {tbl: self.get_table(tbl) for tbl in self.handlers if init_data_handler or self.fire_init_data[tbl]}

Expand All @@ -88,16 +88,16 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
return False
return True

init_callback_data = {tbl: data for tbl, data in init_data.items() if load_data(tbl, data)}
init_callback_data = {tbl: data for tbl, data in init_data.items() if load_data(tbl, data)}

# Pass all initial data that we DID NOT send as updates to handlers through the init callback if provided by caller
if init_data_handler:
init_data_handler(init_callback_data)

while not SignalHandlerHelper.checkSignal(SIGNAL_INT):
item = self.pubsub.listen_message()
while True:
item = self.pubsub.listen_message(interrupt_on_signal=True)
if 'type' not in item:
# When timeout or cancelled, item will not contains 'type'
# When timeout or interrupted, item will not contains 'type'
continue

if item['type'] == 'pmessage':
Expand Down
12 changes: 6 additions & 6 deletions common/pubsub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ bool PubSub::hasCachedData()
return m_keyspace_event_buffer.size() > 1;
}

map<string, string> PubSub::get_message(double timeout)
map<string, string> PubSub::get_message(double timeout, bool interrupt_on_signal)
{
return get_message_internal(timeout).second;
return get_message_internal(timeout, interrupt_on_signal).second;
}

MessageResultPair PubSub::get_message_internal(double timeout)
MessageResultPair PubSub::get_message_internal(double timeout, bool interrupt_on_signal)
{
MessageResultPair ret;

Expand All @@ -93,7 +93,7 @@ MessageResultPair PubSub::get_message_internal(double timeout)
}

Selectable *selected;
int rc = m_select.select(&selected, int(timeout));
int rc = m_select.select(&selected, int(timeout), interrupt_on_signal);
ret.first = rc;
switch (rc)
{
Expand Down Expand Up @@ -128,13 +128,13 @@ MessageResultPair PubSub::get_message_internal(double timeout)

// Note: it is not straightforward to implement redis-py PubSub.listen() directly in c++
// due to the `yield` syntax, so we implement this function for blocking listen one message
std::map<std::string, std::string> PubSub::listen_message()
std::map<std::string, std::string> PubSub::listen_message(bool interrupt_on_signal)
{
const double GET_MESSAGE_INTERVAL = 600.0; // in seconds
MessageResultPair ret;
for (;;)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop also can be removed, listen_message() only used by ConfigDBConnector::listen() method, and that method already do the retry.

If loop removed, some UT also need update.

please refer to my PR: #629 , I close it because it's do almost same thing with this PR.

Copy link
Contributor

@qiluo-msft qiluo-msft Jun 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liuh-80 Is it harmful to keep the loop?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not harmful to keep the loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liuh-80 @qiluo-msft I have the same concerns changing it due to the reason I describe in my previous comment. A behaviour change is a different subject, not relevant to this PR. If there's a decision, that listen_message is not expected to be used by anyone, then this API should be made private and its behaviour may change. That deserves to be a different PR. This PR is a bug fix.

{
ret = get_message_internal(GET_MESSAGE_INTERVAL);
ret = get_message_internal(GET_MESSAGE_INTERVAL, interrupt_on_signal);
if (!ret.second.empty() || ret.first == Select::SIGNALINT)
{
break;
Expand Down
6 changes: 3 additions & 3 deletions common/pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ class PubSub : protected RedisSelect
public:
explicit PubSub(DBConnector *other);

std::map<std::string, std::string> get_message(double timeout = 0.0);
std::map<std::string, std::string> listen_message();
std::map<std::string, std::string> get_message(double timeout = 0.0, bool interrupt_on_signal = false);
std::map<std::string, std::string> listen_message(bool interrupt_on_signal = false);

void psubscribe(const std::string &pattern);
void punsubscribe(const std::string &pattern);
Expand All @@ -32,7 +32,7 @@ class PubSub : protected RedisSelect
private:
/* Pop keyspace event from event buffer. Caller should free resources. */
std::shared_ptr<RedisReply> popEventBuffer();
MessageResultPair get_message_internal(double timeout = 0.0);
MessageResultPair get_message_internal(double timeout = 0.0, bool interrupt_on_signal = false);

DBConnector *m_parentConnector;
Select m_select;
Expand Down
31 changes: 19 additions & 12 deletions common/select.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "common/selectable.h"
#include "common/logger.h"
#include "common/select.h"
#include "common/signalhandlerhelper.h"
#include <algorithm>
#include <stdio.h>
#include <sys/time.h>
Expand Down Expand Up @@ -89,26 +88,34 @@ void Select::addSelectables(vector<Selectable *> selectables)
}
}

int Select::poll_descriptors(Selectable **c, unsigned int timeout)
int Select::poll_descriptors(Selectable **c, unsigned int timeout, bool interrupt_on_signal = false)
{
int sz_selectables = static_cast<int>(m_objects.size());
std::vector<struct epoll_event> events(sz_selectables);
int ret;

do
while(true)
{
ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout);
}
while(ret == -1 && errno == EINTR && !SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT)); // Retry the select if the process was interrupted by a signal

if (SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT))
{
// Return if the epoll_wait was interrupted by SIGTERM
return Select::SIGNALINT;
// on signal interrupt check if we need to return
if (ret == -1 && errno == EINTR)
{
if (interrupt_on_signal)
{
return Select::SIGNALINT;
}
}
// on all other errors break the loop
else
{
break;
}
}

if (ret < 0)
{
return Select::ERROR;
}

for (int i = 0; i < ret; ++i)
{
Expand Down Expand Up @@ -156,7 +163,7 @@ int Select::poll_descriptors(Selectable **c, unsigned int timeout)
return Select::TIMEOUT;
}

int Select::select(Selectable **c, int timeout)
int Select::select(Selectable **c, int timeout, bool interrupt_on_signal)
{
SWSS_LOG_ENTER();

Expand All @@ -172,7 +179,7 @@ int Select::select(Selectable **c, int timeout)
return ret;

/* wait for data */
ret = poll_descriptors(c, timeout);
ret = poll_descriptors(c, timeout, interrupt_on_signal);

return ret;

Expand Down
6 changes: 3 additions & 3 deletions common/select.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ class Select
OBJECT = 0,
ERROR = 1,
TIMEOUT = 2,
SIGNALINT = 3,// Read operation interrupted by SIGINT
SIGNALINT = 3,// Read operation interrupted by a signal
};

int select(Selectable **c, int timeout = -1);
int select(Selectable **c, int timeout = -1, bool interrupt_on_signal = false);
bool isQueueEmpty();

/**
Expand Down Expand Up @@ -66,7 +66,7 @@ class Select
}
};

int poll_descriptors(Selectable **c, unsigned int timeout);
int poll_descriptors(Selectable **c, unsigned int timeout, bool interrupt_on_signal);

int m_epoll_fd;
std::unordered_map<int, Selectable *> m_objects;
Expand Down
70 changes: 0 additions & 70 deletions common/signalhandlerhelper.cpp

This file was deleted.

36 changes: 0 additions & 36 deletions common/signalhandlerhelper.h

This file was deleted.

2 changes: 0 additions & 2 deletions pyext/swsscommon.i
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "pubsub.h"
#include "select.h"
#include "selectable.h"
#include "signalhandlerhelper.h"
#include "rediscommand.h"
#include "table.h"
#include "countertable.h"
Expand Down Expand Up @@ -156,7 +155,6 @@ T castSelectableObj(swss::Selectable *temp)
%include "pubsub.h"
%include "selectable.h"
%include "select.h"
%include "signalhandlerhelper.h"
%include "rediscommand.h"
%include "redispipeline.h"
%include "redisselect.h"
Expand Down
59 changes: 30 additions & 29 deletions tests/test_signalhandler_ut.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
import signal
import os
import pytest
import multiprocessing
import time
from swsscommon import swsscommon
from swsscommon.swsscommon import SignalHandlerHelper

def dummy_signal_handler(signum, stack):
# ignore signal so UT will not break
pass

def test_SignalHandler():
signal.signal(signal.SIGUSR1, dummy_signal_handler)

# Register SIGUSER1
SignalHandlerHelper.registerSignalHandler(signal.SIGUSR1)
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
assert happened == False

# trigger SIGUSER manually
os.kill(os.getpid(), signal.SIGUSR1)
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
assert happened == True

# Reset signal
SignalHandlerHelper.resetSignal(signal.SIGUSR1)
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
assert happened == False

# un-register signal handler
SignalHandlerHelper.restoreSignalHandler(signal.SIGUSR1)
os.kill(os.getpid(), signal.SIGUSR1)
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
assert happened == False

def test_config_db_listen_while_signal_received():
""" Test performs ConfigDBConnector.listen() while signal is received,
checks that the listen() call is interrupted and the regular KeyboardInterrupt is raised.
"""
c=swsscommon.ConfigDBConnector()
c.subscribe('A', lambda a: None)
c.connect(wait_for_init=False)
event = multiprocessing.Event()

def signal_handler(signum, frame):
event.set()
sys.exit(0)

signal.signal(signal.SIGUSR1, signal_handler)

def listen():
c.listen()

thr = multiprocessing.Process(target=listen)
thr.start()

time.sleep(5)
os.kill(thr.pid, signal.SIGUSR1)

thr.join()

assert event.is_set()