@@ -235,7 +235,7 @@ private void InitializeObserver(CollectionItemObserver observer, byte[][] keys)
235
235
// If the key already has a non-empty observer queue, it does not have an item to retrieve
236
236
// Otherwise, try to retrieve next available item
237
237
if ( ( KeysToObservers . ContainsKey ( key ) && KeysToObservers [ key ] . Count > 0 ) ||
238
- ! TryGetResult ( key , observer . Session . storageSession , observer . Command , observer . CommandArgs ,
238
+ ! TryGetResult ( key , observer . Session . storageSession , observer . Command , observer . CommandArgs , true ,
239
239
out _ , out var result ) ) continue ;
240
240
241
241
// An item was found - set the observer result and return
@@ -291,7 +291,7 @@ private bool TryAssignItemFromKey(byte[] key)
291
291
}
292
292
293
293
// Try to get next available item from object stored in key
294
- if ( ! TryGetResult ( key , observer . Session . storageSession , observer . Command , observer . CommandArgs ,
294
+ if ( ! TryGetResult ( key , observer . Session . storageSession , observer . Command , observer . CommandArgs , false ,
295
295
out var currCount , out var result ) )
296
296
{
297
297
// If unsuccessful getting next item but there is at least one item in the collection,
@@ -436,7 +436,9 @@ private static unsafe bool TryGetNextSetObjects(byte[] key, SortedSetObject sort
436
436
}
437
437
}
438
438
439
- private unsafe bool TryGetResult( byte [ ] key, StorageSession storageSession, RespCommand command, ArgSlice[ ] cmdArgs , out int currCount , out CollectionItemResult result )
439
+ private unsafe bool TryGetResult( byte [ ] key, StorageSession storageSession,
440
+ RespCommand command, ArgSlice[ ] cmdArgs , bool initial ,
441
+ out int currCount , out CollectionItemResult result )
440
442
{
441
443
currCount = default ;
442
444
result = default ;
@@ -461,26 +463,65 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
461
463
Debug. Assert ( storageSession . txnManager . state == TxnState . None ) ;
462
464
createTransaction = true;
463
465
var asKey = storageSession. scratchBufferManager . CreateArgSlice ( key ) ;
466
+ if ( initial )
467
+ storageSession. txnManager . SaveKeyEntryToLock ( asKey , false , LockType . Exclusive ) ;
464
468
storageSession. txnManager . SaveKeyEntryToLock ( asKey , true , LockType . Exclusive ) ;
465
469
466
470
if ( command == RespCommand . BLMOVE )
467
471
{
472
+ if ( initial )
473
+ storageSession. txnManager . SaveKeyEntryToLock ( dstKey , false , LockType . Exclusive ) ;
468
474
storageSession. txnManager . SaveKeyEntryToLock ( dstKey , true , LockType . Exclusive ) ;
469
475
}
470
476
471
477
_ = storageSession. txnManager . Run ( true ) ;
472
478
}
473
479
474
480
var objectLockableContext = storageSession. txnManager . ObjectStoreLockableContext ;
481
+ IGarnetObject dstObj = null ;
482
+ byte [ ] arrDstKey = default ;
475
483
476
484
try
477
485
{
478
486
// Get the object stored at key
479
487
var statusOp = storageSession . GET ( key , out var osObject , ref objectLockableContext ) ;
480
- if ( statusOp == GarnetStatus . NOTFOUND ) return false;
488
+ if ( statusOp == GarnetStatus . NOTFOUND )
489
+ {
490
+ if ( ! initial )
491
+ return false;
492
+
493
+ var context = storageSession . txnManager . LockableContext ;
494
+
495
+ var keySlice = storageSession . scratchBufferManager . CreateArgSlice ( key ) ;
496
+ statusOp = storageSession . GET ( keySlice , out ArgSlice _ , ref context ) ;
497
+
498
+ if ( statusOp != GarnetStatus . NOTFOUND )
499
+ {
500
+ result = new CollectionItemResult ( GarnetStatus . WRONGTYPE ) ;
501
+ return true;
502
+ }
503
+
504
+ if ( command == RespCommand . BLMOVE )
505
+ {
506
+ arrDstKey = dstKey . ToArray ( ) ;
507
+ var dstStatusOp = storageSession . GET ( arrDstKey , out var osDstObject , ref objectLockableContext ) ;
508
+ if ( dstStatusOp != GarnetStatus . NOTFOUND && osDstObject . GarnetObject is not ListObject )
509
+ {
510
+ result = new CollectionItemResult ( GarnetStatus . WRONGTYPE ) ;
511
+ return true;
512
+ }
513
+
514
+ dstStatusOp = storageSession . GET ( dstKey , out ArgSlice _ , ref context ) ;
515
+ if ( dstStatusOp != GarnetStatus . NOTFOUND )
516
+ {
517
+ result = new CollectionItemResult ( GarnetStatus . WRONGTYPE ) ;
518
+ return true;
519
+ }
520
+ }
521
+
522
+ return false;
523
+ }
481
524
482
- IGarnetObject dstObj = null ;
483
- byte [ ] arrDstKey = default ;
484
525
if ( command == RespCommand . BLMOVE )
485
526
{
486
527
arrDstKey = dstKey. ToArray ( ) ;
@@ -494,8 +535,13 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
494
535
{
495
536
case ListObject listObj:
496
537
currCount = listObj. LnkList . Count ;
497
- if ( objectType != GarnetObjectType . List ) return false;
498
- if ( currCount == 0 ) return false;
538
+ if ( objectType != GarnetObjectType . List )
539
+ {
540
+ result = new CollectionItemResult( GarnetStatus . WRONGTYPE ) ;
541
+ return initial;
542
+ }
543
+ if ( currCount == 0 )
544
+ return false;
499
545
500
546
switch ( command )
501
547
{
@@ -516,7 +562,11 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
516
562
{
517
563
dstList = tmpDstList ;
518
564
}
519
- else return false;
565
+ else
566
+ {
567
+ result = new CollectionItemResult ( GarnetStatus . WRONGTYPE ) ;
568
+ return initial;
569
+ }
520
570
521
571
isSuccessful = TryMoveNextListItem ( listObj , dstList , ( OperationDirection ) cmdArgs [ 1 ] . ReadOnlySpan [ 0 ] ,
522
572
( OperationDirection ) cmdArgs [ 2 ] . ReadOnlySpan [ 0 ] , out nextItem ) ;
@@ -543,19 +593,24 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
543
593
result = new CollectionItemResult ( key , items ) ;
544
594
return true ;
545
595
default :
546
- return false ;
596
+ result = new CollectionItemResult ( GarnetStatus . WRONGTYPE ) ;
597
+ return initial ;
547
598
}
548
599
case SortedSetObject setObj :
549
600
currCount = setObj. Count( ) ;
550
601
if ( objectType ! = GarnetObjectType . SortedSet )
551
- return false ;
602
+ {
603
+ result = new CollectionItemResult ( GarnetStatus . WRONGTYPE ) ;
604
+ return initial;
605
+ }
552
606
if ( currCount == 0 )
553
607
return false ;
554
608
555
609
return TryGetNextSetObjects( key , setObj , currCount , command , cmdArgs , out result ) ;
556
610
557
611
default :
558
- return false;
612
+ result = new CollectionItemResult ( GarnetStatus . WRONGTYPE ) ;
613
+ return initial;
559
614
}
560
615
}
561
616
finally
0 commit comments