Skip to content

Reduce unnecessary producerStateTable publish notifications #258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/consumer_state_table_pops.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,12 @@ for i = 1, n do
-- Clean up the key in temporary state table
redis.call('DEL', stateprefix..tablename..key)
end

-- Check whether the keyset is empty, if not, signal ourselves to process again.
-- This is to handle the case where the number of keys in keyset is more than POP_BATCH_SIZE
local num = redis.call('SCARD', KEYS[1])
if num > 0 then
redis.call('PUBLISH', ARGV[3], 'G')
end

return ret
35 changes: 33 additions & 2 deletions common/consumerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ ConsumerStateTable::ConsumerStateTable(DBConnector *db, const std::string &table

RedisReply r(dequeueReply());
setQueueLength(r.getReply<long long int>());

std::string luaMultiPublish =
"for i = 1, KEYS[2] do\n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_multiPublish = loadRedisScript(m_db, luaMultiPublish);
}

void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string& /*prefix*/)
Expand All @@ -38,13 +44,14 @@ void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const st

RedisCommand command;
command.format(
"EVALSHA %s 3 %s %s: %s %d %s",
"EVALSHA %s 3 %s %s: %s %d %s %s",
sha.c_str(),
getKeySetName().c_str(),
getTableName().c_str(),
getDelKeySetName().c_str(),
POP_BATCH_SIZE,
getStateHashPrefix().c_str());
getStateHashPrefix().c_str(),
getChannelName().c_str());

RedisReply r(m_db, command);
auto ctx0 = r.getContext();
Expand Down Expand Up @@ -92,4 +99,28 @@ void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const st
}
}

void ConsumerStateTable::pop(KeyOpFieldsValuesTuple &kco, const std::string &prefix)
{
pop(kfvKey(kco), kfvOp(kco), kfvFieldsValues(kco), prefix);
}

void ConsumerStateTable::pop(std::string &key, std::string &op, std::vector<FieldValueTuple> &fvs, const std::string &prefix)
{
bool emptyBuff = m_buffer.empty();
ConsumerTableBase::pop(key, op, fvs, prefix);

// This is to generate notification to user of pop call, since user expect one notification per key.
if (emptyBuff && !m_buffer.empty())
{
RedisCommand command;
command.format(
"EVALSHA %s 2 %s %d %s ",
m_multiPublish.c_str(),
getChannelName().c_str(),
m_buffer.size(),
"G");
RedisReply r(m_db, command);
}
}

}
6 changes: 6 additions & 0 deletions common/consumerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ class ConsumerStateTable : public ConsumerTableBase, public TableName_KeySet

/* Get multiple pop elements */
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);

void pop(KeyOpFieldsValuesTuple &kco, const std::string &prefix = EMPTY_PREFIX) override;
void pop(std::string &key, std::string &op, std::vector<FieldValueTuple> &fvs, const std::string &prefix = EMPTY_PREFIX) override;

private:
std::string m_multiPublish;
};

}
4 changes: 2 additions & 2 deletions common/consumertablebase.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ class ConsumerTableBase: public TableConsumable, public RedisTransactioner

const DBConnector* getDbConnector() const;

void pop(KeyOpFieldsValuesTuple &kco, const std::string &prefix = EMPTY_PREFIX);
virtual void pop(KeyOpFieldsValuesTuple &kco, const std::string &prefix = EMPTY_PREFIX);

void pop(std::string &key, std::string &op, std::vector<FieldValueTuple> &fvs, const std::string &prefix = EMPTY_PREFIX);
virtual void pop(std::string &key, std::string &op, std::vector<FieldValueTuple> &fvs, const std::string &prefix = EMPTY_PREFIX);

bool empty() const { return m_buffer.empty(); };
protected:
Expand Down
13 changes: 7 additions & 6 deletions common/producerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,24 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
, m_tempViewActive(false)
, m_pipe(pipeline)
{
// num in luaSet and luaDel means number of elements that were added to the key set,
// not including all the elements already present into the set.
// publish notification is generated only when keyset was empty.
string luaSet =
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
"redis.call('SADD', KEYS[2], ARGV[2])\n"
"for i = 0, #KEYS - 3 do\n"
" redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n"
"end\n"
" if added > 0 then \n"
"local num = redis.call('SCARD', KEYS[2])\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is probably better to move this check before we add the current key itself (before "redis.call('SADD', KEYS[2], ARGV[2])\n"), and we check whether "num == 0". With the code as is, in case the current key is the same as the exist key before, we could end up publishing more notifications than needed.

" if num == 1 then \n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, since we have only one notification at producer side for the keyset now, we might want to be cautious of the case where the notification could get dropped. In such case, the keyset will never be served to consumer even we may update keys/values in the keyset, as we will never generate notifications at producer again. Some mechanism might be needed here to prevent such cases.

Copy link
Contributor Author

@jipanyang jipanyang Feb 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlikely redis will drop the notification, in that case probably a lot of bad things happened before the error.

But I agree this is a valid concern, we should avoid single point of failure here. Currently I'm considering this extra check:

  1. producerStateTable side, together with notification, it also leaves one field/value pair under _PRODUCESTATE_PUBLISH_REC
    ex.
127.0.0.1:6380> hset _PRODUCESTATE_PUBLISH_REC ROUTE_TABLE_CHANNEL 1
(integer) 1
127.0.0.1:6380> hset _PRODUCESTATE_PUBLISH_REC PORT_TABLE_CHANNEL 1
(integer) 1
127.0.0.1:6380> hgetall _PRODUCESTATE_PUBLISH_REC
1) "ROUTE_TABLE_CHANNEL"
2) "1"
3) "PORT_TABLE_CHANNEL"
4) "1"


  1. consumerStateTable side, after consuming all the data, it deletes the field/value pair
127.0.0.1:6380> hdel  _PRODUCESTATE_PUBLISH_REC ROUTE_TABLE_CHANNEL
(integer) 1
127.0.0.1:6380> hdel _PRODUCESTATE_PUBLISH_REC PORT_TABLE_CHANNEL
(integer) 1
  1. The caller of consumerStateTable, which is orchagent, do a check of _PRODUCESTATE_PUBLISH_REC key after each Select::TIMEOUT event.
127.0.0.1:6380> hgetall _PRODUCESTATE_PUBLISH_REC
(empty list or set)

If it is not empty, likely the notification has been lost, generate new channel notification for each FV under the key so the pending data could be picked up.

Due to race condition, there is very small possibility of false positive. We generate one extra notification, that is ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mechanism looks ok to me. A few questions:
1, "If it is not empty, likely the notification has been lost, generate new channel notification for each FV under the key so the pending data could be picked up".
Any reason we do notification per FV? I would think we should generate the notification per keyset as before for consumer to pick up.

2, "Due to race condition"
Where exactly the race condition? If after time-out there could be a notification before we check the " _PRODUCESTATE_PUBLISH_REC", we could check the notification all together there atomically?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FV here is the FV under _PRODUCESTATE_PUBLISH_REC:
ex.

1) "ROUTE_TABLE_CHANNEL"
2) "1"
3) "PORT_TABLE_CHANNEL"
4) "1"

each FV represents one keyset which has pending data.

For race condition, consider such a scenario:

  1. Orchagent has been idle for 1 second, timeout signal will kick in.

  2. neighsyncd has discovered a new neighbor and programmed NEIGH_TABLE, then generated redis notification

  3. At the exact moment, if timeout signal is processed by orchagent before redis notification while neighsyncd already put data under _PRODUCESTATE_PUBLISH_REC key, orchagent will create one extra reidis publish notification for NEIGH_TABLE, though immediately after that the real notification from neighsyncd will be processed.

This will be really rare scenario, the extra notification is harmless other than causing orchagent to perform one more check.

" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaSet = m_pipe->loadRedisScript(luaSet);

string luaDel =
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
"redis.call('SADD', KEYS[2], ARGV[2])\n"
"redis.call('SADD', KEYS[4], ARGV[2])\n"
"redis.call('DEL', KEYS[3])\n"
"if added > 0 then \n"
"local num = redis.call('SCARD', KEYS[2])\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as before, to check the keyset number before we add the new key to be more precise.

"if num == 1 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaDel = m_pipe->loadRedisScript(luaDel);
Expand Down
2 changes: 1 addition & 1 deletion pyext/swsscommon.i
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@
%apply std::string& OUTPUT {std::string &op};
%apply std::vector<std::pair<std::string, std::string>>& OUTPUT {std::vector<std::pair<std::string, std::string>> &fvs};
%include "consumertablebase.h"
%include "consumerstatetable.h"
%clear std::string &key;
%clear std::string &op;
%clear std::vector<std::pair<std::string, std::string>> &fvs;

%include "consumertable.h"
%include "consumerstatetable.h"
%include "subscriberstatetable.h"

%apply std::string& OUTPUT {std::string &op};
Expand Down
6 changes: 3 additions & 3 deletions tests/redis_piped_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ TEST(ConsumerStateTable, async_singlethread)
EXPECT_EQ(p.count(), 0);
RedisReply r2(&db, queryCommand.c_str(), REDIS_REPLY_ARRAY);
EXPECT_EQ(r2.getContext()->elements, (size_t)0);

int numberOfNotification = 0;
while ((ret = cs.select(&selectcs, 1000)) == Select::OBJECT)
{
Expand All @@ -483,8 +483,8 @@ TEST(ConsumerStateTable, async_singlethread)
}
EXPECT_EQ(p.count(), 0);

// ConsumerStateTable got all the notifications though no real data available.
EXPECT_EQ(NUMBER_OF_OPS, numberOfNotification);
// ConsumerStateTable got less notifications than number of operations.
EXPECT_GT(NUMBER_OF_OPS, numberOfNotification);
EXPECT_EQ(ret, Select::TIMEOUT);

cout << "Done. Waiting for all job to finish " << NUMBER_OF_OPS << " jobs." << endl;
Expand Down
60 changes: 51 additions & 9 deletions tests/redis_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,22 +480,13 @@ TEST(ConsumerStateTable, set_pop_del_set_pop_get)
* Third pop operation, consumer received two consectuive signals.
* data depleted upon first one
*/
/*
{
int ret = cs.select(&selectcs, 1000);
EXPECT_EQ(ret, Select::OBJECT);
KeyOpFieldsValuesTuple kco;
c.pop(kco);
EXPECT_EQ(kfvKey(kco), "");
}
*/

/*
* With the optimization in ProducerStateTable set/del operation,
* if there is pending operations on the same key, producer won't publish
* redundant notification again.
* So the check above was commented out.
*/

/* Third select operation */
{
Expand Down Expand Up @@ -955,6 +946,57 @@ TEST(ConsumerStateTable, view_switch_delete_with_consumer_2)
EXPECT_EQ(r8.getContext()->elements, (size_t) 0);
}

TEST(ConsumerStateTable, consumerBeforeProducer)
{
clearDB();

int index = 0;
string tableName = "UT_REDIS_THREAD_" + to_string(index);
DBConnector db(TEST_DB, "localhost", 6379, 0);
// Set popBatchSize to 100.
// Conumer will get one notification only, but it should be able to
// read all the data via multiple batch pop.
ConsumerStateTable c(&db, tableName, 100);
ProducerStateTable p(&db, tableName);

for (int i = 0; i < NUMBER_OF_OPS; i++)
{
vector<FieldValueTuple> fields;
int maxNumOfFields = getMaxFields(i);
for (int j = 0; j < maxNumOfFields; j++)
{
FieldValueTuple t(field(j), value(j));
fields.push_back(t);
}
if ((i % 100) == 0)
cout << "+" << flush;

p.set(key(i), fields);
}

Select cs;
Selectable *selectcs;
int ret, i = 0;
KeyOpFieldsValuesTuple kco;

cs.addSelectable(&c);
int numberOfKeysSet = 0;
while ((ret = cs.select(&selectcs, 1000)) == Select::OBJECT)
{
c.pop(kco);
EXPECT_EQ(kfvOp(kco), "SET");
numberOfKeysSet++;
validateFields(kfvKey(kco), kfvFieldsValues(kco));

if ((i++ % 100) == 0)
cout << "-" << flush;
}
EXPECT_EQ(numberOfKeysSet, NUMBER_OF_OPS);

cout << endl << "Done." << endl;
}


TEST(ConsumerStateTable, singlethread)
{
clearDB();
Expand Down