Skip to content

Fix SIGTERM can't terminate PubSub::listen issue by add cancellation token support. #606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
May 19, 2022
Merged
2 changes: 2 additions & 0 deletions common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ libswsscommon_la_SOURCES = \
select.cpp \
selectableevent.cpp \
selectabletimer.cpp \
cancellationtoken.cpp \
consumertable.cpp \
consumertablebase.cpp \
consumerstatetable.cpp \
Expand All @@ -62,6 +63,7 @@ libswsscommon_la_SOURCES = \
tokenize.cpp \
exec.cpp \
saiaclschema.cpp \
signalhandlerhelper.cpp \
subscriberstatetable.cpp \
timestamp.cpp \
warm_restart.cpp \
Expand Down
29 changes: 29 additions & 0 deletions common/cancellationtoken.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "cancellationtoken.h"

using namespace swss;

CancellationToken::CancellationToken()
:m_cancled(false)
{
}

CancellationToken::~CancellationToken()
{
Cancel();
}

bool CancellationToken::IsCancled()
{
return m_cancled;
}

void CancellationToken::Cancel()
{
m_cancled = true;
}

void CancellationToken::Reset()
{
m_cancled = false;
}

18 changes: 18 additions & 0 deletions common/cancellationtoken.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

namespace swss {

class CancellationToken
{
public:
CancellationToken();
~CancellationToken();
bool IsCancled();
void Cancel();
void Reset();

private:
bool m_cancled;
};

}
13 changes: 11 additions & 2 deletions common/configdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native

## Note: callback is difficult to implement by SWIG C++, so keep in python
def listen(self, init_data_handler=None):
cancellation_token = CancellationToken();
return self.listen(cancellation_token, init_data_handler)

## Note: callback is difficult to implement by SWIG C++, so keep in python
def listen(self, cancellation_token, init_data_handler=None):
## Start listen Redis keyspace event. Pass a callback function to `init` to handle initial table data.
self.pubsub = self.get_redis_client(self.db_name).pubsub()
self.pubsub.psubscribe("__keyspace@{}__:*".format(self.get_dbid(self.db_name)))
Expand All @@ -94,8 +99,12 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
if init_data_handler:
init_data_handler(init_callback_data)

while True:
item = self.pubsub.listen_message()
while not cancellation_token.IsCancled():
item = self.pubsub.listen_message(cancellation_token)
if not item.has_key('type'):
# When timeout or cancled, item will not contains 'type'
continue

if item['type'] == 'pmessage':
key = item['channel'].split(':', 1)[1]
try:
Expand Down
22 changes: 19 additions & 3 deletions common/pubsub.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "pubsub.h"
#include "cancellationtoken.h"
#include "dbconnector.h"
#include "logger.h"
#include "redisreply.h"
Expand Down Expand Up @@ -78,6 +79,12 @@ bool PubSub::hasCachedData()
}

map<string, string> PubSub::get_message(double timeout)
{
CancellationToken cancellationToken;
return get_message(cancellationToken, timeout);
}

map<string, string> PubSub::get_message(CancellationToken &cancellationToken, double timeout)
{
map<string, string> ret;
if (!m_subscribe)
Expand All @@ -86,12 +93,13 @@ map<string, string> PubSub::get_message(double timeout)
}

Selectable *selected;
int rc = m_select.select(&selected, int(timeout));
int rc = m_select.select(&selected, cancellationToken, int(timeout));
switch (rc)
{
case Select::ERROR:
throw RedisError("Failed to select", m_subscribe->getContext());

case Select::CANCELLED:
case Select::TIMEOUT:
return ret;

Expand Down Expand Up @@ -120,16 +128,24 @@ map<string, string> PubSub::get_message(double timeout)
// Note: it is not straightforward to implement redis-py PubSub.listen() directly in c++
// due to the `yield` syntax, so we implement this function for blocking listen one message
std::map<std::string, std::string> PubSub::listen_message()
{
CancellationToken cancellationToken;
return listen_message(cancellationToken);
}

std::map<std::string, std::string> PubSub::listen_message(CancellationToken &cancellationToken)
{
const double GET_MESSAGE_INTERVAL = 600.0; // in seconds
for (;;)
while (!cancellationToken.IsCancled())
{
auto ret = get_message(GET_MESSAGE_INTERVAL);
auto ret = get_message(cancellationToken, GET_MESSAGE_INTERVAL);
if (!ret.empty())
{
return ret;
}
}

return map<string, string>();
}

shared_ptr<RedisReply> PubSub::popEventBuffer()
Expand Down
4 changes: 4 additions & 0 deletions common/pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

namespace swss {

class CancellationToken;

// This class is to emulate python redis-py class PubSub
// After SWIG wrapping, it should be used in the same way
class PubSub : protected RedisSelect
Expand All @@ -16,7 +18,9 @@ class PubSub : protected RedisSelect
explicit PubSub(DBConnector *other);

std::map<std::string, std::string> get_message(double timeout = 0.0);
std::map<std::string, std::string> get_message(CancellationToken &cancellationToken, double timeout = 0.0);
std::map<std::string, std::string> listen_message();
std::map<std::string, std::string> listen_message(CancellationToken &cancellationToken);

void psubscribe(const std::string &pattern);
void punsubscribe(const std::string &pattern);
Expand Down
24 changes: 20 additions & 4 deletions common/select.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <unistd.h>
#include <string.h>


using namespace std;

namespace swss {
Expand Down Expand Up @@ -87,7 +88,7 @@ void Select::addSelectables(vector<Selectable *> selectables)
}
}

int Select::poll_descriptors(Selectable **c, unsigned int timeout)
int Select::poll_descriptors(Selectable **c, unsigned int timeout, CancellationToken &cancellationToken)
{
int sz_selectables = static_cast<int>(m_objects.size());
std::vector<struct epoll_event> events(sz_selectables);
Expand All @@ -96,8 +97,14 @@ int Select::poll_descriptors(Selectable **c, unsigned int timeout)
do
{
ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout);

// sleep here make python signal handler not be blocked.
sleep(0);
}
while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal
while(ret == -1 && errno == EINTR && !cancellationToken.IsCancled()); // Retry the select if the process was interrupted by a signal

if (cancellationToken.IsCancled())
return Select::CANCELLED;

if (ret < 0)
return Select::ERROR;
Expand Down Expand Up @@ -149,6 +156,12 @@ int Select::poll_descriptors(Selectable **c, unsigned int timeout)
}

int Select::select(Selectable **c, int timeout)
{
CancellationToken cancellationToken;
return select(c, cancellationToken, timeout);
}

int Select::select(Selectable **c, CancellationToken &cancellationToken, int timeout)
{
SWSS_LOG_ENTER();

Expand All @@ -157,14 +170,14 @@ int Select::select(Selectable **c, int timeout)
*c = NULL;

/* check if we have some data */
ret = poll_descriptors(c, 0);
ret = poll_descriptors(c, 0, cancellationToken);

/* return if we have data, we have an error or desired timeout was 0 */
if (ret != Select::TIMEOUT || timeout == 0)
return ret;

/* wait for data */
ret = poll_descriptors(c, timeout);
ret = poll_descriptors(c, timeout, cancellationToken);

return ret;

Expand All @@ -190,6 +203,9 @@ std::string Select::resultToString(int result)
case swss::Select::TIMEOUT:
return "TIMEOUT";

case swss::Select::CANCELLED:
return "CANCELLED";

default:
SWSS_LOG_WARN("unknown select result: %d", result);
return "UNKNOWN";
Expand Down
5 changes: 4 additions & 1 deletion common/select.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <unordered_map>
#include <set>
#include <hiredis/hiredis.h>
#include "cancellationtoken.h"
#include "selectable.h"

namespace swss {
Expand All @@ -30,9 +31,11 @@ class Select
OBJECT = 0,
ERROR = 1,
TIMEOUT = 2,
CANCELLED = 3,
};

int select(Selectable **c, int timeout = -1);
int select(Selectable **c, CancellationToken &cancellationToken, int timeout = -1);
bool isQueueEmpty();

/**
Expand Down Expand Up @@ -65,7 +68,7 @@ class Select
}
};

int poll_descriptors(Selectable **c, unsigned int timeout);
int poll_descriptors(Selectable **c, unsigned int timeout, CancellationToken &cancellationToken);

int m_epoll_fd;
std::unordered_map<int, Selectable *> m_objects;
Expand Down
28 changes: 28 additions & 0 deletions common/signalhandlerhelper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include <signal.h>

#include "cancellationtoken.h"
#include "signalhandlerhelper.h"

using namespace swss;

std::map<int, CancellationToken*> SignalHandlerHelper::signalTokenMapping;

void SignalHandlerHelper::RegisterSignalHandler(int signalNumber)
{
signal(signalNumber, SignalHandlerHelper::OnSignal);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it override the signal handler defined by the application? In case of python, does it mean that the user defined signal handler in python won't be executed?

Copy link
Contributor Author

@liuh-80 liuh-80 Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because linux can only have 1 signal handler per-application.
If customer need define their signal handler, then customer also need cancel the token when signal happen in their handler, this class just a helper class.

}

void SignalHandlerHelper::RegisterCancellationToken(int signalNumber, CancellationToken& token)
{
signalTokenMapping[signalNumber] = &token;
}

void SignalHandlerHelper::OnSignal(int signalNumber)
{
auto result = signalTokenMapping.find(signalNumber);
if (result != signalTokenMapping.end())
{
result->second->Cancel();
}
}

25 changes: 25 additions & 0 deletions common/signalhandlerhelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once
#include <map>

namespace swss {

class CancellationToken;

/*
SignalHandlerHelper class provide a native signal handler.
Python signal handler have following issue:
A long-running calculation implemented purely in C (such as regular expression matching on a large body of text) may run uninterrupted for an arbitrary amount of time, regardless of any signals received. The Python signal handlers will be called when the calculation finishes.
For more information, please check: https://docs.python.org/3/library/signal.html
*/
class SignalHandlerHelper
{
public:
static void RegisterSignalHandler(int signalNumber);
static void RegisterCancellationToken(int signalNumber, CancellationToken& token);
static void OnSignal(int signalNumber);

private:
static std::map<int, CancellationToken*> signalTokenMapping;
};

}
4 changes: 4 additions & 0 deletions pyext/swsscommon.i
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "pubsub.h"
#include "select.h"
#include "selectable.h"
#include "signalhandlerhelper.h"
#include "rediscommand.h"
#include "table.h"
#include "redispipeline.h"
Expand All @@ -34,6 +35,7 @@
#include "notificationproducer.h"
#include "warm_restart.h"
#include "logger.h"
#include "cancellationtoken.h"
#include "configdb.h"
#include "status_code_util.h"
%}
Expand Down Expand Up @@ -154,10 +156,12 @@ T castSelectableObj(swss::Selectable *temp)
%include "pubsub.h"
%include "selectable.h"
%include "select.h"
%include "signalhandlerhelper.h"
%include "rediscommand.h"
%include "redispipeline.h"
%include "redisselect.h"
%include "redistran.h"
%include "cancellationtoken.h"
%include "configdb.h"

%extend swss::DBConnector {
Expand Down