Skip to content

Fix SIGTERM can't terminate PubSub::listen issue by add cancellation token support. #606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
May 19, 2022
Merged
1 change: 1 addition & 0 deletions common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ libswsscommon_la_SOURCES = \
select.cpp \
selectableevent.cpp \
selectabletimer.cpp \
signalhandlerhelper.cpp \
consumertable.cpp \
consumertablebase.cpp \
consumerstatetable.cpp \
Expand Down
6 changes: 5 additions & 1 deletion common/configdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,12 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
if init_data_handler:
init_data_handler(init_callback_data)

while True:
while not SignalHandlerHelper.checkSignal(SIGNAL_INT):
item = self.pubsub.listen_message()
if 'type' not in item:
# When timeout or cancelled, item will not contains 'type'
continue

if item['type'] == 'pmessage':
key = item['channel'].split(':', 1)[1]
try:
Expand Down
27 changes: 19 additions & 8 deletions common/pubsub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,29 @@ bool PubSub::hasCachedData()

map<string, string> PubSub::get_message(double timeout)
{
map<string, string> ret;
return get_message_internal(timeout).second;
}

MessageResultPair PubSub::get_message_internal(double timeout)
{
MessageResultPair ret;

if (!m_subscribe)
{
ret.first = Select::OBJECT;
return ret;
}

Selectable *selected;
int rc = m_select.select(&selected, int(timeout));
ret.first = rc;
switch (rc)
{
case Select::ERROR:
throw RedisError("Failed to select", m_subscribe->getContext());

case Select::TIMEOUT:
case Select::SIGNALINT:
return ret;

case Select::OBJECT:
Expand All @@ -110,10 +119,10 @@ map<string, string> PubSub::get_message(double timeout)
}

auto message = event->getReply<RedisMessage>();
ret["type"] = message.type;
ret["pattern"] = message.pattern;
ret["channel"] = message.channel;
ret["data"] = message.data;
ret.second["type"] = message.type;
ret.second["pattern"] = message.pattern;
ret.second["channel"] = message.channel;
ret.second["data"] = message.data;
return ret;
}

Expand All @@ -124,12 +133,14 @@ std::map<std::string, std::string> PubSub::listen_message()
const double GET_MESSAGE_INTERVAL = 600.0; // in seconds
for (;;)
{
auto ret = get_message(GET_MESSAGE_INTERVAL);
if (!ret.empty())
auto ret = get_message_internal(GET_MESSAGE_INTERVAL);
if (!ret.second.empty() || ret.first == Select::SIGNALINT)
{
return ret;
return ret.second;
}
}

return map<string, string>();
}

shared_ptr<RedisReply> PubSub::popEventBuffer()
Expand Down
4 changes: 4 additions & 0 deletions common/pubsub.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#pragma once
#include <map>
#include <deque>
#include <utility>

#include "dbconnector.h"
#include "select.h"
#include "redisselect.h"

namespace swss {

typedef std::pair<int, std::map<std::string, std::string> > MessageResultPair;

// This class is to emulate python redis-py class PubSub
// After SWIG wrapping, it should be used in the same way
class PubSub : protected RedisSelect
Expand All @@ -29,6 +32,7 @@ class PubSub : protected RedisSelect
private:
/* Pop keyspace event from event buffer. Caller should free resources. */
std::shared_ptr<RedisReply> popEventBuffer();
MessageResultPair get_message_internal( double timeout = 0.0);

DBConnector *m_parentConnector;
Select m_select;
Expand Down
13 changes: 12 additions & 1 deletion common/select.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "common/selectable.h"
#include "common/logger.h"
#include "common/select.h"
#include "common/signalhandlerhelper.h"
#include <algorithm>
#include <stdio.h>
#include <sys/time.h>
Expand All @@ -9,6 +10,7 @@
#include <unistd.h>
#include <string.h>


using namespace std;

namespace swss {
Expand Down Expand Up @@ -97,7 +99,13 @@ int Select::poll_descriptors(Selectable **c, unsigned int timeout)
{
ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout);
}
while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal
while(ret == -1 && errno == EINTR && !SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT)); // Retry the select if the process was interrupted by a signal

if (SignalHandlerHelper::checkSignal(Signals::SIGNAL_INT))
{
// Return if the epoll_wait was interrupted by SIGTERM
return Select::SIGNALINT;
}

if (ret < 0)
return Select::ERROR;
Expand Down Expand Up @@ -190,6 +198,9 @@ std::string Select::resultToString(int result)
case swss::Select::TIMEOUT:
return "TIMEOUT";

case swss::Select::SIGNALINT:
return "SIGNALINT";

default:
SWSS_LOG_WARN("unknown select result: %d", result);
return "UNKNOWN";
Expand Down
1 change: 1 addition & 0 deletions common/select.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Select
OBJECT = 0,
ERROR = 1,
TIMEOUT = 2,
SIGNALINT = 3,// Read operation interrupted by SIGINT
};

int select(Selectable **c, int timeout = -1);
Expand Down
72 changes: 72 additions & 0 deletions common/signalhandlerhelper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#include <signal.h>
#include "common/logger.h"
#include "signalhandlerhelper.h"

using namespace swss;

std::map<int, bool> SignalHandlerHelper::m_signalStatusMapping;
std::map<int, SigActionPair> SignalHandlerHelper::m_sigActionMapping;

void SignalHandlerHelper::registerSignalHandler(int signalNumber)
{
auto result = m_sigActionMapping.find(signalNumber);
if (result != m_sigActionMapping.end())
{
// signal action already registered
SWSS_LOG_WARN("sigaction for %d already registered.", signalNumber);
return;
}

m_signalStatusMapping[signalNumber] = false;
auto *old_action = new struct sigaction();
auto *new_action = new struct sigaction();
new_action->sa_handler = SignalHandlerHelper::onSignal;
sigemptyset(&new_action->sa_mask);
new_action->sa_flags = 0;

// always replace old action even old action is ignore signal
sigaction(signalNumber, new_action, old_action);

SigActionPair sig_action_pair(new_action, old_action);
m_sigActionMapping[signalNumber] = sig_action_pair;
}

void SignalHandlerHelper::restoreSignalHandler(int signalNumber)
{
auto result = m_sigActionMapping.find(signalNumber);
if (result == m_sigActionMapping.end())
{
// signal action does not registered
SWSS_LOG_WARN("sigaction for %d does not registered.",signalNumber);
return;
}

auto *new_action = result->second.first;
auto *old_action = result->second.second;

sigaction(signalNumber, old_action, NULL);

delete new_action;
delete old_action;
}

void SignalHandlerHelper::onSignal(int signalNumber)
{
m_signalStatusMapping[signalNumber] = true;
}

bool SignalHandlerHelper::checkSignal(int signalNumber)
{
auto result = m_signalStatusMapping.find(signalNumber);
if (result != m_signalStatusMapping.end())
{
return result->second;
}

return false;
}

void SignalHandlerHelper::resetSignal(int signalNumber)
{
m_signalStatusMapping[signalNumber] = false;
}
36 changes: 36 additions & 0 deletions common/signalhandlerhelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once
#include <map>
#include <signal.h>

namespace swss {

typedef std::pair<struct sigaction*, struct sigaction*> SigActionPair;

// Define signal ID enum for python
enum Signals
{
SIGNAL_TERM = SIGTERM,
SIGNAL_INT = SIGINT
};

/*
SignalHandlerHelper class provide a native signal handler.
Python signal handler have following issue:
A long-running calculation implemented purely in C (such as regular expression matching on a large body of text) may run uninterrupted for an arbitrary amount of time, regardless of any signals received. The Python signal handlers will be called when the calculation finishes.
For more information, please check: https://docs.python.org/3/library/signal.html
*/
class SignalHandlerHelper
{
public:
static void registerSignalHandler(int signalNumber);
static void restoreSignalHandler(int signalNumber);
static void onSignal(int signalNumber);
static bool checkSignal(int signalNumber);
static void resetSignal(int signalNumber);

private:
static std::map<int, bool> m_signalStatusMapping;
static std::map<int, SigActionPair> m_sigActionMapping;
};

}
2 changes: 2 additions & 0 deletions pyext/swsscommon.i
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "pubsub.h"
#include "select.h"
#include "selectable.h"
#include "signalhandlerhelper.h"
#include "rediscommand.h"
#include "table.h"
#include "redispipeline.h"
Expand Down Expand Up @@ -154,6 +155,7 @@ T castSelectableObj(swss::Selectable *temp)
%include "pubsub.h"
%include "selectable.h"
%include "select.h"
%include "signalhandlerhelper.h"
%include "rediscommand.h"
%include "redispipeline.h"
%include "redisselect.h"
Expand Down
33 changes: 33 additions & 0 deletions tests/test_signalhandler_ut.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import signal
import os
import pytest
from swsscommon import swsscommon
from swsscommon.swsscommon import SignalHandlerHelper

def dummy_signal_handler(signum, stack):
# ignore signal so UT will not break
pass

def test_SignalHandler():
signal.signal(signal.SIGUSR1, dummy_signal_handler)

# Register SIGUSER1
SignalHandlerHelper.registerSignalHandler(signal.SIGUSR1)
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
assert happened == False

# trigger SIGUSER manually
os.kill(os.getpid(), signal.SIGUSR1)
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
assert happened == True

# Reset signal
SignalHandlerHelper.resetSignal(signal.SIGUSR1)
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
assert happened == False

# un-register signal handler
SignalHandlerHelper.restoreSignalHandler(signal.SIGUSR1)
os.kill(os.getpid(), signal.SIGUSR1)
happened = SignalHandlerHelper.checkSignal(signal.SIGUSR1)
assert happened == False