@@ -482,3 +482,82 @@ func testStartWatcherFromCompactedRevision(t *testing.T, performCompactOnTombsto
482
482
}
483
483
}
484
484
}
485
+
486
+ // TestResumeCompactionOnTombstone verifies whether a deletion event is preserved
487
+ // when etcd restarts and resumes compaction on a key that only has a tombstone revision.
488
+ func TestResumeCompactionOnTombstone (t * testing.T ) {
489
+ e2e .BeforeTest (t )
490
+
491
+ ctx := context .Background ()
492
+ compactBatchLimit := 5
493
+
494
+ cfg := e2e.EtcdProcessClusterConfig {
495
+ GoFailEnabled : true ,
496
+ ClusterSize : 1 ,
497
+ IsClientAutoTLS : true ,
498
+ ClientTLS : e2e .ClientTLS ,
499
+ CompactionBatchLimit : compactBatchLimit ,
500
+ WatchProcessNotifyInterval : 100 * time .Millisecond ,
501
+ }
502
+ clus , err := e2e .NewEtcdProcessCluster (t , & cfg )
503
+ require .NoError (t , err )
504
+ defer clus .Close ()
505
+
506
+ c1 := newClient (t , clus .EndpointsGRPC (), cfg .ClientTLS , cfg .IsClientAutoTLS )
507
+ defer c1 .Close ()
508
+
509
+ keyPrefix := "/key-"
510
+ for i := 0 ; i < compactBatchLimit ; i ++ {
511
+ key := fmt .Sprintf ("%s%d" , keyPrefix , i )
512
+ value := fmt .Sprintf ("%d" , i )
513
+
514
+ t .Logf ("PUT key=%s, val=%s" , key , value )
515
+ _ , err = c1 .KV .Put (ctx , key , value )
516
+ require .NoError (t , err )
517
+ }
518
+
519
+ firstKey := keyPrefix + "0"
520
+ t .Logf ("DELETE key=%s" , firstKey )
521
+ deleteResp , err := c1 .KV .Delete (ctx , firstKey )
522
+ require .NoError (t , err )
523
+
524
+ var deleteEvent * clientv3.Event
525
+ select {
526
+ case watchResp := <- c1 .Watch (ctx , firstKey , clientv3 .WithRev (deleteResp .Header .Revision )):
527
+ require .Len (t , watchResp .Events , 1 )
528
+
529
+ require .Equal (t , mvccpb .DELETE , watchResp .Events [0 ].Type )
530
+ deletedKey := string (watchResp .Events [0 ].Kv .Key )
531
+ require .Equal (t , firstKey , deletedKey )
532
+
533
+ deleteEvent = watchResp .Events [0 ]
534
+ case <- time .After (100 * time .Millisecond ):
535
+ t .Fatal ("timed out getting watch response" )
536
+ }
537
+
538
+ require .NoError (t , clus .Procs [0 ].Failpoints ().SetupHTTP (ctx , "compactBeforeSetFinishedCompact" , `panic` ))
539
+
540
+ t .Logf ("COMPACT rev=%d" , deleteResp .Header .Revision )
541
+ _ , err = c1 .KV .Compact (ctx , deleteResp .Header .Revision , clientv3 .WithCompactPhysical ())
542
+ require .Error (t , err )
543
+
544
+ require .Error (t , clus .Procs [0 ].Stop ())
545
+ // NOTE: The proc panics and exit code is 2. It's impossible to restart
546
+ // that etcd proc because last exit code is 2 and Restart() refuses to
547
+ // start new one. Using IsRunning() function is to cleanup status.
548
+ require .False (t , clus .Procs [0 ].IsRunning ())
549
+ require .NoError (t , clus .Restart ())
550
+
551
+ c2 := newClient (t , clus .EndpointsGRPC (), cfg .ClientTLS , cfg .IsClientAutoTLS )
552
+ defer c2 .Close ()
553
+
554
+ watchChan := c2 .Watch (ctx , firstKey , clientv3 .WithRev (deleteResp .Header .Revision ))
555
+ select {
556
+ case watchResp := <- watchChan :
557
+ require .Equal (t , []* clientv3.Event {deleteEvent }, watchResp .Events )
558
+ case <- time .After (100 * time .Millisecond ):
559
+ // we care only about the first response, but have an
560
+ // escape hatch in case the watch response is delayed.
561
+ t .Fatal ("timed out getting watch response" )
562
+ }
563
+ }
0 commit comments