Skip to content

Commit cc2f80b

Browse files
authored
Fix SubscriberStateTable::hasCachedData formula for a timing risk (#379)
* Fix hasCachedData formula * Add test case to verify the fix * Remove dead code in test
1 parent 200f2b0 commit cc2f80b

File tree

2 files changed

+119
-1
lines changed

2 files changed

+119
-1
lines changed

common/subscriberstatetable.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ bool SubscriberStateTable::hasData()
8989

9090
bool SubscriberStateTable::hasCachedData()
9191
{
92-
return m_buffer.size() > 1 || m_keyspace_event_buffer.size() > 1;
92+
return m_buffer.size() + m_keyspace_event_buffer.size() > 1;
9393
}
9494

9595
void SubscriberStateTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string& /*prefix*/)

tests/redis_subscriber_state_ut.cpp

+118
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,124 @@ TEST(SubscriberStateTable, set)
222222
}
223223
}
224224

225+
TEST(SubscriberStateTable, set2_pop1_set1_pop1)
226+
{
227+
clearDB();
228+
229+
/* Prepare producer */
230+
DBConnector db("TEST_DB", 0, true);
231+
Table p(&db, testTableName);
232+
int maxNumOfFields = 2;
233+
234+
/* Prepare subscriber */
235+
SubscriberStateTable c(&db, testTableName);
236+
Select cs;
237+
Selectable *selectcs;
238+
cs.addSelectable(&c);
239+
240+
/* Set 1st, 2nd */
241+
int index;
242+
for (index = 0; index < 2; index++)
243+
{
244+
vector<FieldValueTuple> fields;
245+
for (int j = 0; j < maxNumOfFields; j++)
246+
{
247+
FieldValueTuple t(field(index, j), value(index, j));
248+
fields.push_back(t);
249+
}
250+
p.set(key(index, 0), fields);
251+
}
252+
253+
/* Pop 1st */
254+
index = 0;
255+
{
256+
int ret = cs.select(&selectcs);
257+
EXPECT_EQ(ret, Select::OBJECT);
258+
KeyOpFieldsValuesTuple kco;
259+
c.pop(kco);
260+
261+
EXPECT_EQ(kfvKey(kco), key(index, 0));
262+
EXPECT_EQ(kfvOp(kco), "SET");
263+
264+
auto fvs = kfvFieldsValues(kco);
265+
EXPECT_EQ(fvs.size(), (unsigned int)(maxNumOfFields));
266+
267+
map<string, string> mm;
268+
for (auto fv: fvs)
269+
{
270+
mm[fvField(fv)] = fvValue(fv);
271+
}
272+
273+
for (int j = 0; j < maxNumOfFields; j++)
274+
{
275+
EXPECT_EQ(mm[field(index, j)], value(index, j));
276+
}
277+
}
278+
279+
/* Set 3rd */
280+
for (index = 2; index < 3; index++)
281+
{
282+
vector<FieldValueTuple> fields;
283+
for (int j = 0; j < maxNumOfFields; j++)
284+
{
285+
FieldValueTuple t(field(index, j), value(index, j));
286+
fields.push_back(t);
287+
}
288+
p.set(key(index, 0), fields);
289+
}
290+
291+
/* Pop 2nd */
292+
index = 1;
293+
{
294+
int ret = cs.select(&selectcs);
295+
EXPECT_EQ(ret, Select::OBJECT);
296+
KeyOpFieldsValuesTuple kco;
297+
c.pop(kco);
298+
EXPECT_EQ(kfvKey(kco), key(index, 0));
299+
EXPECT_EQ(kfvOp(kco), "SET");
300+
301+
auto fvs = kfvFieldsValues(kco);
302+
EXPECT_EQ(fvs.size(), (unsigned int)(maxNumOfFields));
303+
304+
map<string, string> mm;
305+
for (auto fv: fvs)
306+
{
307+
mm[fvField(fv)] = fvValue(fv);
308+
}
309+
310+
for (int j = 0; j < maxNumOfFields; j++)
311+
{
312+
EXPECT_EQ(mm[field(index, j)], value(index, j));
313+
}
314+
}
315+
316+
/* Pop 3rd */
317+
index = 2;
318+
{
319+
// Note: the code before this commit will hang at below select()
320+
int ret = cs.select(&selectcs);
321+
EXPECT_EQ(ret, Select::OBJECT);
322+
KeyOpFieldsValuesTuple kco;
323+
c.pop(kco);
324+
EXPECT_EQ(kfvKey(kco), key(index, 0));
325+
EXPECT_EQ(kfvOp(kco), "SET");
326+
327+
auto fvs = kfvFieldsValues(kco);
328+
EXPECT_EQ(fvs.size(), (unsigned int)(maxNumOfFields));
329+
330+
map<string, string> mm;
331+
for (auto fv: fvs)
332+
{
333+
mm[fvField(fv)] = fvValue(fv);
334+
}
335+
336+
for (int j = 0; j < maxNumOfFields; j++)
337+
{
338+
EXPECT_EQ(mm[field(index, j)], value(index, j));
339+
}
340+
}
341+
}
342+
225343
TEST(SubscriberStateTable, pops_intial)
226344
{
227345
clearDB();

0 commit comments

Comments
 (0)