Skip to content

Commit c3341c3

Browse files
committed
Move channel publish call into consumer_state_table_pops.lua to make them atomic.
Signed-off-by: Jipan Yang <[email protected]>
1 parent 43af582 commit c3341c3

File tree

3 files changed

+11
-22
lines changed

3 files changed

+11
-22
lines changed

common/consumer_state_table_pops.lua

+8
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,12 @@ for i = 1, n do
2020
-- Clean up the key in temporary state table
2121
redis.call('DEL', stateprefix..tablename..key)
2222
end
23+
24+
-- Check whether the keyset is empty, if not, signal ourselves to process again.
25+
-- This is to handle the case where the number of keys in keyset is more than POP_BATCH_SIZE
26+
local num = redis.call('SCARD', KEYS[1])
27+
if num > 0 then
28+
redis.call('PUBLISH', ARGV[3], 'G')
29+
end
30+
2331
return ret

common/consumerstatetable.cpp

+3-21
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,6 @@ ConsumerStateTable::ConsumerStateTable(DBConnector *db, const std::string &table
3434
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
3535
"end\n";
3636
m_multiPublish = loadRedisScript(m_db, luaMultiPublish);
37-
38-
std::string luaScardPublish =
39-
"local num = redis.call('SCARD', KEYS[2])\n"
40-
"if num > 0 then\n"
41-
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
42-
"end\n";
43-
m_scardPublish = loadRedisScript(m_db, luaScardPublish);
4437
}
4538

4639
void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string& /*prefix*/)
@@ -51,30 +44,19 @@ void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const st
5144

5245
RedisCommand command;
5346
command.format(
54-
"EVALSHA %s 3 %s %s: %s %d %s",
47+
"EVALSHA %s 3 %s %s: %s %d %s %s",
5548
sha.c_str(),
5649
getKeySetName().c_str(),
5750
getTableName().c_str(),
5851
getDelKeySetName().c_str(),
5952
POP_BATCH_SIZE,
60-
getStateHashPrefix().c_str());
53+
getStateHashPrefix().c_str(),
54+
getChannelName().c_str());
6155

6256
RedisReply r(m_db, command);
6357
auto ctx0 = r.getContext();
6458
vkco.clear();
6559

66-
// Check whether the keyset is empty, if not, signal ourselves to process again.
67-
// This is to handle the case where the number of keys in keyset is more than POP_BATCH_SIZE
68-
// Note there is possibility of false positive since this call is not atomic with consumer_state_table_pops.lua
69-
// Putting publish call inside consumer_state_table_pops.lua doesn't seem to work.
70-
command.format(
71-
"EVALSHA %s 2 %s %s %s",
72-
m_scardPublish.c_str(),
73-
getChannelName().c_str(),
74-
getKeySetName().c_str(),
75-
"G");
76-
RedisReply r2(m_db, command);
77-
7860
// if the set is empty, return an empty kco object
7961
if (ctx0->type == REDIS_REPLY_NIL)
8062
{

common/consumerstatetable.h

-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ class ConsumerStateTable : public ConsumerTableBase, public TableName_KeySet
2020

2121
private:
2222
std::string m_multiPublish;
23-
std::string m_scardPublish;
2423
};
2524

2625
}

0 commit comments

Comments
 (0)