Skip to content

Fix SIGTERM can't terminate PubSub::listen issue by add cancellation token support. #606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
May 19, 2022
Merged
1 change: 1 addition & 0 deletions common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ libswsscommon_la_SOURCES = \
select.cpp \
selectableevent.cpp \
selectabletimer.cpp \
signalhandlerhelper.cpp \
consumertable.cpp \
consumertablebase.cpp \
consumerstatetable.cpp \
Expand Down
6 changes: 5 additions & 1 deletion common/configdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,12 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
if init_data_handler:
init_data_handler(init_callback_data)

while True:
while not SignalHandlerHelper.checkSignal(SIGNAL_INT):
item = self.pubsub.listen_message()
if 'type' not in item:
# When timeout or cancelled, item will not contains 'type'
continue

if item['type'] == 'pmessage':
key = item['channel'].split(':', 1)[1]
try:
Expand Down
27 changes: 19 additions & 8 deletions common/pubsub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,29 @@ bool PubSub::hasCachedData()

map<string, string> PubSub::get_message(double timeout)
{
map<string, string> ret;
return get_message_internal(timeout).second;
}

pair<int, map<string, string> > PubSub::get_message_internal(double timeout)
{
pair<int, map<string, string> > ret;
ret.first = Select::OBJECT;

if (!m_subscribe)
{
return ret;
}

Selectable *selected;
int rc = m_select.select(&selected, int(timeout));
ret.first = rc;
switch (rc)
{
case Select::ERROR:
throw RedisError("Failed to select", m_subscribe->getContext());

case Select::TIMEOUT:
case Select::SIGNALINT:
return ret;

case Select::OBJECT:
Expand All @@ -110,10 +119,10 @@ map<string, string> PubSub::get_message(double timeout)
}

auto message = event->getReply<RedisMessage>();
ret["type"] = message.type;
ret["pattern"] = message.pattern;
ret["channel"] = message.channel;
ret["data"] = message.data;
ret.second["type"] = message.type;
ret.second["pattern"] = message.pattern;
ret.second["channel"] = message.channel;
ret.second["data"] = message.data;
return ret;
}

Expand All @@ -124,12 +133,14 @@ std::map<std::string, std::string> PubSub::listen_message()
const double GET_MESSAGE_INTERVAL = 600.0; // in seconds
for (;;)
{
auto ret = get_message(GET_MESSAGE_INTERVAL);
if (!ret.empty())
auto ret = get_message_internal(GET_MESSAGE_INTERVAL);
if (!ret.second.empty() || ret.first == Select::SIGNALINT)
{
return ret;
return ret.second;
}
}

return map<string, string>();
}

shared_ptr<RedisReply> PubSub::popEventBuffer()
Expand Down
2 changes: 2 additions & 0 deletions common/pubsub.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <map>
#include <deque>
#include <utility>

#include "dbconnector.h"
#include "select.h"
Expand Down Expand Up @@ -29,6 +30,7 @@ class PubSub : protected RedisSelect
private:
/* Pop keyspace event from event buffer. Caller should free resources. */
std::shared_ptr<RedisReply> popEventBuffer();
std::pair<int, std::map<std::string, std::string> > get_message_internal( double timeout = 0.0);

DBConnector *m_parentConnector;
Select m_select;
Expand Down
16 changes: 15 additions & 1 deletion common/select.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "common/selectable.h"
#include "common/logger.h"
#include "common/select.h"
#include "common/signalhandlerhelper.h"
#include <algorithm>
#include <stdio.h>
#include <sys/time.h>
Expand All @@ -9,6 +10,7 @@
#include <unistd.h>
#include <string.h>


using namespace std;

namespace swss {
Expand Down Expand Up @@ -96,8 +98,17 @@ int Select::poll_descriptors(Selectable **c, unsigned int timeout)
do
{
ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout);

// sleep here make python signal handler not be blocked.
sleep(0);
}
while(ret == -1 && errno == EINTR && !SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT)); // Retry the select if the process was interrupted by a signal

if (SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT))
{
// Return if the epoll_wait was interrupted by SIGTERM
return Select::SIGNALINT;
}
while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal

if (ret < 0)
return Select::ERROR;
Expand Down Expand Up @@ -190,6 +201,9 @@ std::string Select::resultToString(int result)
case swss::Select::TIMEOUT:
return "TIMEOUT";

case swss::Select::SIGNALINT:
return "SIGNALINT";

default:
SWSS_LOG_WARN("unknown select result: %d", result);
return "UNKNOWN";
Expand Down
1 change: 1 addition & 0 deletions common/select.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Select
OBJECT = 0,
ERROR = 1,
TIMEOUT = 2,
SIGNALINT = 3,// Read operation interrupted by SIGINT
};

int select(Selectable **c, int timeout = -1);
Expand Down
33 changes: 33 additions & 0 deletions common/signalhandlerhelper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include <signal.h>

#include "signalhandlerhelper.h"

using namespace swss;

std::map<int, bool> SignalHandlerHelper::signalStatusMapping;

void SignalHandlerHelper::registerSignalHandler(int signalNumber)
{
signalStatusMapping[signalNumber] = false;
signal(signalNumber, SignalHandlerHelper::onSignal);
}

void SignalHandlerHelper::onSignal(int signalNumber)
{
signalStatusMapping[signalNumber] = true;
}

bool SignalHandlerHelper::checkSignal(int signalNumber)
{
auto result = signalStatusMapping.find(signalNumber);
if (result != signalStatusMapping.end()) {
return result->second;
}

return false;
}

void SignalHandlerHelper::resetSignal(int signalNumber)
{
signalStatusMapping[signalNumber] = false;
}
32 changes: 32 additions & 0 deletions common/signalhandlerhelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once
#include <map>
#include <signal.h>

namespace swss {

// Define signal ID enum for python
enum Signals
{
SIGNAL_TERM = SIGTERM,
SIGNAL_INT = SIGINT
};

/*
SignalHandlerHelper class provide a native signal handler.
Python signal handler have following issue:
A long-running calculation implemented purely in C (such as regular expression matching on a large body of text) may run uninterrupted for an arbitrary amount of time, regardless of any signals received. The Python signal handlers will be called when the calculation finishes.
For more information, please check: https://docs.python.org/3/library/signal.html
*/
class SignalHandlerHelper
{
public:
static void registerSignalHandler(int signalNumber);
static void onSignal(int signalNumber);
static bool checkSignal(int signalNumber);
static void resetSignal(int signalNumber);

private:
static std::map<int, bool> signalStatusMapping;
};

}
2 changes: 2 additions & 0 deletions pyext/swsscommon.i
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "pubsub.h"
#include "select.h"
#include "selectable.h"
#include "signalhandlerhelper.h"
#include "rediscommand.h"
#include "table.h"
#include "redispipeline.h"
Expand Down Expand Up @@ -154,6 +155,7 @@ T castSelectableObj(swss::Selectable *temp)
%include "pubsub.h"
%include "selectable.h"
%include "select.h"
%include "signalhandlerhelper.h"
%include "rediscommand.h"
%include "redispipeline.h"
%include "redisselect.h"
Expand Down