Skip to content

Commit cda2a3a

Browse files
committed
Check type in blocking functions.
Add more timeout checks. Add test.
1 parent 35e7161 commit cda2a3a

File tree

5 files changed

+288
-94
lines changed

5 files changed

+288
-94
lines changed

libs/server/Objects/ItemBroker/CollectionItemBroker.cs

+67-12
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ private void InitializeObserver(CollectionItemObserver observer, byte[][] keys)
235235
// If the key already has a non-empty observer queue, it does not have an item to retrieve
236236
// Otherwise, try to retrieve next available item
237237
if ((KeysToObservers.ContainsKey(key) && KeysToObservers[key].Count > 0) ||
238-
!TryGetResult(key, observer.Session.storageSession, observer.Command, observer.CommandArgs,
238+
!TryGetResult(key, observer.Session.storageSession, observer.Command, observer.CommandArgs, true,
239239
out _, out var result)) continue;
240240

241241
// An item was found - set the observer result and return
@@ -291,7 +291,7 @@ private bool TryAssignItemFromKey(byte[] key)
291291
}
292292

293293
// Try to get next available item from object stored in key
294-
if (!TryGetResult(key, observer.Session.storageSession, observer.Command, observer.CommandArgs,
294+
if (!TryGetResult(key, observer.Session.storageSession, observer.Command, observer.CommandArgs, false,
295295
out var currCount, out var result))
296296
{
297297
// If unsuccessful getting next item but there is at least one item in the collection,
@@ -436,7 +436,9 @@ private static unsafe bool TryGetNextSetObjects(byte[] key, SortedSetObject sort
436436
}
437437
}
438438

439-
private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, RespCommand command, ArgSlice[] cmdArgs, out int currCount, out CollectionItemResult result)
439+
private unsafe bool TryGetResult(byte[] key, StorageSession storageSession,
440+
RespCommand command, ArgSlice[] cmdArgs, bool initial,
441+
out int currCount, out CollectionItemResult result)
440442
{
441443
currCount = default;
442444
result = default;
@@ -461,26 +463,65 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
461463
Debug.Assert(storageSession.txnManager.state == TxnState.None);
462464
createTransaction = true;
463465
var asKey = storageSession.scratchBufferManager.CreateArgSlice(key);
466+
if (initial)
467+
storageSession.txnManager.SaveKeyEntryToLock(asKey, false, LockType.Exclusive);
464468
storageSession.txnManager.SaveKeyEntryToLock(asKey, true, LockType.Exclusive);
465469

466470
if (command == RespCommand.BLMOVE)
467471
{
472+
if (initial)
473+
storageSession.txnManager.SaveKeyEntryToLock(dstKey, false, LockType.Exclusive);
468474
storageSession.txnManager.SaveKeyEntryToLock(dstKey, true, LockType.Exclusive);
469475
}
470476

471477
_ = storageSession.txnManager.Run(true);
472478
}
473479

474480
var objectLockableContext = storageSession.txnManager.ObjectStoreLockableContext;
481+
IGarnetObject dstObj = null;
482+
byte[] arrDstKey = default;
475483

476484
try
477485
{
478486
// Get the object stored at key
479487
var statusOp = storageSession.GET(key, out var osObject, ref objectLockableContext);
480-
if (statusOp == GarnetStatus.NOTFOUND) return false;
488+
if (statusOp == GarnetStatus.NOTFOUND)
489+
{
490+
if (!initial)
491+
return false;
492+
493+
var context = storageSession.txnManager.LockableContext;
494+
495+
var keySlice = storageSession.scratchBufferManager.CreateArgSlice(key);
496+
statusOp = storageSession.GET(keySlice, out ArgSlice _, ref context);
497+
498+
if (statusOp != GarnetStatus.NOTFOUND)
499+
{
500+
result = new CollectionItemResult(GarnetStatus.WRONGTYPE);
501+
return true;
502+
}
503+
504+
if (command == RespCommand.BLMOVE)
505+
{
506+
arrDstKey = dstKey.ToArray();
507+
var dstStatusOp = storageSession.GET(arrDstKey, out var osDstObject, ref objectLockableContext);
508+
if (dstStatusOp != GarnetStatus.NOTFOUND && osDstObject.GarnetObject is not ListObject)
509+
{
510+
result = new CollectionItemResult(GarnetStatus.WRONGTYPE);
511+
return true;
512+
}
513+
514+
dstStatusOp = storageSession.GET(dstKey, out ArgSlice _, ref context);
515+
if (dstStatusOp != GarnetStatus.NOTFOUND)
516+
{
517+
result = new CollectionItemResult(GarnetStatus.WRONGTYPE);
518+
return true;
519+
}
520+
}
521+
522+
return false;
523+
}
481524

482-
IGarnetObject dstObj = null;
483-
byte[] arrDstKey = default;
484525
if (command == RespCommand.BLMOVE)
485526
{
486527
arrDstKey = dstKey.ToArray();
@@ -494,8 +535,13 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
494535
{
495536
case ListObject listObj:
496537
currCount = listObj.LnkList.Count;
497-
if (objectType != GarnetObjectType.List) return false;
498-
if (currCount == 0) return false;
538+
if (objectType != GarnetObjectType.List)
539+
{
540+
result = new CollectionItemResult(GarnetStatus.WRONGTYPE);
541+
return initial;
542+
}
543+
if (currCount == 0)
544+
return false;
499545

500546
switch (command)
501547
{
@@ -516,7 +562,11 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
516562
{
517563
dstList = tmpDstList;
518564
}
519-
else return false;
565+
else
566+
{
567+
result = new CollectionItemResult(GarnetStatus.WRONGTYPE);
568+
return initial;
569+
}
520570

521571
isSuccessful = TryMoveNextListItem(listObj, dstList, (OperationDirection)cmdArgs[1].ReadOnlySpan[0],
522572
(OperationDirection)cmdArgs[2].ReadOnlySpan[0], out nextItem);
@@ -543,19 +593,24 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
543593
result = new CollectionItemResult(key, items);
544594
return true;
545595
default:
546-
return false;
596+
result = new CollectionItemResult(GarnetStatus.WRONGTYPE);
597+
return initial;
547598
}
548599
case SortedSetObject setObj:
549600
currCount = setObj.Count();
550601
if (objectType != GarnetObjectType.SortedSet)
551-
return false;
602+
{
603+
result = new CollectionItemResult(GarnetStatus.WRONGTYPE);
604+
return initial;
605+
}
552606
if (currCount == 0)
553607
return false;
554608

555609
return TryGetNextSetObjects(key, setObj, currCount, command, cmdArgs, out result);
556610

557611
default:
558-
return false;
612+
result = new CollectionItemResult(GarnetStatus.WRONGTYPE);
613+
return initial;
559614
}
560615
}
561616
finally

libs/server/Objects/ItemBroker/CollectionItemResult.cs

+13-4
Original file line numberDiff line numberDiff line change
@@ -8,30 +8,39 @@ namespace Garnet.server
88
/// </summary>
99
internal readonly struct CollectionItemResult
1010
{
11+
public CollectionItemResult(GarnetStatus status)
12+
{
13+
Status = status;
14+
}
15+
1116
public CollectionItemResult(byte[] key, byte[] item)
1217
{
1318
Key = key;
1419
Item = item;
20+
Status = key == default ? GarnetStatus.NOTFOUND : GarnetStatus.OK;
1521
}
1622

1723
public CollectionItemResult(byte[] key, byte[][] items)
1824
{
1925
Key = key;
2026
Items = items;
27+
Status = key == default ? GarnetStatus.NOTFOUND : GarnetStatus.OK;
2128
}
2229

2330
public CollectionItemResult(byte[] key, double score, byte[] item)
2431
{
2532
Key = key;
26-
Score = score;
2733
Item = item;
34+
Score = score;
35+
Status = key == default ? GarnetStatus.NOTFOUND : GarnetStatus.OK;
2836
}
2937

3038
public CollectionItemResult(byte[] key, double[] scores, byte[][] items)
3139
{
3240
Key = key;
33-
Scores = scores;
3441
Items = items;
42+
Scores = scores;
43+
Status = key == default ? GarnetStatus.NOTFOUND : GarnetStatus.OK;
3544
}
3645

3746
private CollectionItemResult(bool isForceUnblocked)
@@ -40,9 +49,9 @@ private CollectionItemResult(bool isForceUnblocked)
4049
}
4150

4251
/// <summary>
43-
/// True if item was found
52+
/// Result status
4453
/// </summary>
45-
internal bool Found => Key != default;
54+
internal GarnetStatus Status { get; }
4655

4756
/// <summary>
4857
/// Key of collection from which item was retrieved

libs/server/Resp/Objects/ListCommands.cs

+78-44
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,12 @@ private bool ListBlockingPop(RespCommand command)
294294

295295
if (!parseState.TryGetDouble(parseState.Count - 1, out var timeout))
296296
{
297-
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_TIMEOUT_NOT_VALID_FLOAT, ref dcurr, dend))
298-
SendAndReset();
299-
return true;
297+
return AbortWithErrorMessage(CmdStrings.RESP_ERR_TIMEOUT_NOT_VALID_FLOAT);
298+
}
299+
300+
if (timeout < 0)
301+
{
302+
return AbortWithErrorMessage(CmdStrings.RESP_ERR_TIMEOUT_IS_NEGATIVE);
300303
}
301304

302305
if (storeWrapper.itemBroker == null)
@@ -311,21 +314,27 @@ private bool ListBlockingPop(RespCommand command)
311314
return true;
312315
}
313316

314-
if (!result.Found)
317+
switch (result.Status)
315318
{
316-
while (!RespWriteUtils.TryWriteNullArray(ref dcurr, dend))
317-
SendAndReset();
318-
}
319-
else
320-
{
321-
while (!RespWriteUtils.TryWriteArrayLength(2, ref dcurr, dend))
322-
SendAndReset();
319+
case GarnetStatus.OK:
320+
while (!RespWriteUtils.TryWriteArrayLength(2, ref dcurr, dend))
321+
SendAndReset();
323322

324-
while (!RespWriteUtils.TryWriteBulkString(new Span<byte>(result.Key), ref dcurr, dend))
325-
SendAndReset();
323+
while (!RespWriteUtils.TryWriteBulkString(new Span<byte>(result.Key), ref dcurr, dend))
324+
SendAndReset();
326325

327-
while (!RespWriteUtils.TryWriteBulkString(new Span<byte>(result.Item), ref dcurr, dend))
328-
SendAndReset();
326+
while (!RespWriteUtils.TryWriteBulkString(new Span<byte>(result.Item), ref dcurr, dend))
327+
SendAndReset();
328+
break;
329+
case GarnetStatus.WRONGTYPE:
330+
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
331+
SendAndReset();
332+
break;
333+
case GarnetStatus.NOTFOUND:
334+
default:
335+
while (!RespWriteUtils.TryWriteNullArray(ref dcurr, dend))
336+
SendAndReset();
337+
break;
329338
}
330339

331340
return true;
@@ -345,9 +354,12 @@ private unsafe bool ListBlockingMove()
345354

346355
if (!parseState.TryGetDouble(4, out var timeout))
347356
{
348-
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_TIMEOUT_NOT_VALID_FLOAT, ref dcurr, dend))
349-
SendAndReset();
350-
return true;
357+
return AbortWithErrorMessage(CmdStrings.RESP_ERR_TIMEOUT_NOT_VALID_FLOAT);
358+
}
359+
360+
if (timeout < 0)
361+
{
362+
return AbortWithErrorMessage(CmdStrings.RESP_ERR_TIMEOUT_IS_NEGATIVE);
351363
}
352364

353365
return ListBlockingMove(srcKey, dstKey, srcDir, dstDir, timeout);
@@ -371,9 +383,12 @@ private bool ListBlockingPopPush()
371383

372384
if (!parseState.TryGetDouble(2, out var timeout))
373385
{
374-
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_TIMEOUT_NOT_VALID_FLOAT, ref dcurr, dend))
375-
SendAndReset();
376-
return true;
386+
return AbortWithErrorMessage(CmdStrings.RESP_ERR_TIMEOUT_NOT_VALID_FLOAT);
387+
}
388+
389+
if (timeout < 0)
390+
{
391+
return AbortWithErrorMessage(CmdStrings.RESP_ERR_TIMEOUT_IS_NEGATIVE);
377392
}
378393

379394
return ListBlockingMove(srcKey, dstKey, rightOption, leftOption, timeout);
@@ -405,14 +420,20 @@ private bool ListBlockingMove(ArgSlice srcKey, ArgSlice dstKey, ArgSlice srcDir,
405420
var result = storeWrapper.itemBroker.MoveCollectionItemAsync(RespCommand.BLMOVE, srcKey.ToArray(), this, timeout,
406421
cmdArgs).Result;
407422

408-
if (!result.Found)
423+
switch (result.Status)
409424
{
410-
WriteNull();
411-
}
412-
else
413-
{
414-
while (!RespWriteUtils.TryWriteBulkString(new Span<byte>(result.Item), ref dcurr, dend))
415-
SendAndReset();
425+
case GarnetStatus.OK:
426+
while (!RespWriteUtils.TryWriteBulkString(new Span<byte>(result.Item), ref dcurr, dend))
427+
SendAndReset();
428+
break;
429+
case GarnetStatus.WRONGTYPE:
430+
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
431+
SendAndReset();
432+
break;
433+
case GarnetStatus.NOTFOUND:
434+
default:
435+
WriteNull();
436+
break;
416437
}
417438

418439
return true;
@@ -925,6 +946,11 @@ private unsafe bool ListBlockingPopMultiple()
925946
return AbortWithErrorMessage(CmdStrings.RESP_ERR_TIMEOUT_NOT_VALID_FLOAT);
926947
}
927948

949+
if (timeout < 0)
950+
{
951+
return AbortWithErrorMessage(CmdStrings.RESP_ERR_TIMEOUT_IS_NEGATIVE);
952+
}
953+
928954
// Read count of keys
929955
if (!parseState.TryGetInt(currTokenId++, out var numKeys))
930956
{
@@ -987,28 +1013,36 @@ private unsafe bool ListBlockingPopMultiple()
9871013
{
9881014
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_UNBLOCKED_CLIENT_VIA_CLIENT_UNBLOCK, ref dcurr, dend))
9891015
SendAndReset();
990-
}
991-
992-
if (!result.Found)
993-
{
994-
WriteNull();
9951016
return true;
9961017
}
9971018

998-
while (!RespWriteUtils.TryWriteArrayLength(2, ref dcurr, dend))
999-
SendAndReset();
1019+
switch (result.Status)
1020+
{
1021+
case GarnetStatus.OK:
1022+
while (!RespWriteUtils.TryWriteArrayLength(2, ref dcurr, dend))
1023+
SendAndReset();
10001024

1001-
while (!RespWriteUtils.TryWriteBulkString(result.Key, ref dcurr, dend))
1002-
SendAndReset();
1025+
while (!RespWriteUtils.TryWriteBulkString(result.Key, ref dcurr, dend))
1026+
SendAndReset();
10031027

1004-
var elements = result.Items;
1005-
while (!RespWriteUtils.TryWriteArrayLength(elements.Length, ref dcurr, dend))
1006-
SendAndReset();
1028+
var elements = result.Items;
1029+
while (!RespWriteUtils.TryWriteArrayLength(elements.Length, ref dcurr, dend))
1030+
SendAndReset();
10071031

1008-
foreach (var element in elements)
1009-
{
1010-
while (!RespWriteUtils.TryWriteBulkString(element, ref dcurr, dend))
1011-
SendAndReset();
1032+
foreach (var element in elements)
1033+
{
1034+
while (!RespWriteUtils.TryWriteBulkString(element, ref dcurr, dend))
1035+
SendAndReset();
1036+
}
1037+
break;
1038+
case GarnetStatus.WRONGTYPE:
1039+
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
1040+
SendAndReset();
1041+
break;
1042+
case GarnetStatus.NOTFOUND:
1043+
default:
1044+
WriteNull();
1045+
break;
10121046
}
10131047

10141048
return true;

0 commit comments

Comments
 (0)