@@ -4457,3 +4457,188 @@ func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T
4457
4457
}
4458
4458
}
4459
4459
}
4460
+
4461
+ func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream (t * testing.T ) {
4462
+ c := createJetStreamClusterExplicit (t , "R3S" , 3 )
4463
+ defer c .shutdown ()
4464
+
4465
+ nc , js := jsClientConnect (t , c .randomServer ())
4466
+ defer nc .Close ()
4467
+
4468
+ _ , err := js .AddStream (& nats.StreamConfig {
4469
+ Name : "TEST" ,
4470
+ Subjects : []string {"foo" },
4471
+ Retention : nats .WorkQueuePolicy ,
4472
+ Replicas : 3 ,
4473
+ })
4474
+ require_NoError (t , err )
4475
+
4476
+ _ , err = js .Publish ("foo" , nil )
4477
+ require_NoError (t , err )
4478
+
4479
+ // Wait for all servers to have applied everything.
4480
+ var maxApplied uint64
4481
+ checkFor (t , 5 * time .Second , 100 * time .Millisecond , func () error {
4482
+ maxApplied = 0
4483
+ for _ , s := range c .servers {
4484
+ acc , err := s .lookupAccount (globalAccountName )
4485
+ if err != nil {
4486
+ return err
4487
+ }
4488
+ mset , err := acc .lookupStream ("TEST" )
4489
+ if err != nil {
4490
+ return err
4491
+ }
4492
+ _ , _ , applied := mset .node .Progress ()
4493
+ if maxApplied == 0 {
4494
+ maxApplied = applied
4495
+ } else if applied < maxApplied {
4496
+ return fmt .Errorf ("applied not high enough, expected %d, got %d" , applied , maxApplied )
4497
+ } else if applied > maxApplied {
4498
+ return fmt .Errorf ("applied higher on one server, expected %d, got %d" , applied , maxApplied )
4499
+ }
4500
+ }
4501
+ return nil
4502
+ })
4503
+
4504
+ // Install a snapshot on a follower.
4505
+ s := c .randomNonStreamLeader (globalAccountName , "TEST" )
4506
+ acc , err := s .lookupAccount (globalAccountName )
4507
+ require_NoError (t , err )
4508
+ mset , err := acc .lookupStream ("TEST" )
4509
+ require_NoError (t , err )
4510
+ err = mset .node .InstallSnapshot (mset .stateSnapshotLocked ())
4511
+ require_NoError (t , err )
4512
+
4513
+ // Validate the snapshot reflects applied.
4514
+ validateStreamState := func (snap * snapshot ) {
4515
+ t .Helper ()
4516
+ require_Equal (t , snap .lastIndex , maxApplied )
4517
+ ss , err := DecodeStreamState (snap .data )
4518
+ require_NoError (t , err )
4519
+ require_Equal (t , ss .FirstSeq , 1 )
4520
+ require_Equal (t , ss .LastSeq , 1 )
4521
+ }
4522
+ snap , err := mset .node .(* raft ).loadLastSnapshot ()
4523
+ require_NoError (t , err )
4524
+ validateStreamState (snap )
4525
+
4526
+ // Simulate a message being stored, but not calling Applied yet.
4527
+ err = mset .processJetStreamMsg ("foo" , _EMPTY_ , nil , nil , 1 , time .Now ().UnixNano (), nil )
4528
+ require_NoError (t , err )
4529
+
4530
+ // Simulate the stream being stopped before we're able to call Applied.
4531
+ // If we'd install a snapshot during this, which would be a race condition,
4532
+ // we'd store a snapshot with state that's ahead of applied.
4533
+ err = mset .stop (false , false )
4534
+ require_NoError (t , err )
4535
+
4536
+ // Validate the snapshot is the same as before.
4537
+ snap , err = mset .node .(* raft ).loadLastSnapshot ()
4538
+ require_NoError (t , err )
4539
+ validateStreamState (snap )
4540
+ }
4541
+
4542
+ func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer (t * testing.T ) {
4543
+ c := createJetStreamClusterExplicit (t , "R3S" , 3 )
4544
+ defer c .shutdown ()
4545
+
4546
+ nc , js := jsClientConnect (t , c .randomServer ())
4547
+ defer nc .Close ()
4548
+
4549
+ _ , err := js .AddStream (& nats.StreamConfig {
4550
+ Name : "TEST" ,
4551
+ Subjects : []string {"foo" },
4552
+ Retention : nats .WorkQueuePolicy ,
4553
+ Replicas : 3 ,
4554
+ })
4555
+ require_NoError (t , err )
4556
+
4557
+ _ , err = js .AddConsumer ("TEST" , & nats.ConsumerConfig {
4558
+ Durable : "CONSUMER" ,
4559
+ Replicas : 3 ,
4560
+ AckPolicy : nats .AckExplicitPolicy ,
4561
+ })
4562
+ require_NoError (t , err )
4563
+
4564
+ // Add a message and let the consumer ack it, this moves the consumer's RAFT applied up.
4565
+ _ , err = js .Publish ("foo" , nil )
4566
+ require_NoError (t , err )
4567
+ sub , err := js .PullSubscribe ("foo" , "CONSUMER" )
4568
+ require_NoError (t , err )
4569
+ msgs , err := sub .Fetch (1 )
4570
+ require_NoError (t , err )
4571
+ require_Len (t , len (msgs ), 1 )
4572
+ err = msgs [0 ].AckSync ()
4573
+ require_NoError (t , err )
4574
+
4575
+ // Wait for all servers to have applied everything.
4576
+ var maxApplied uint64
4577
+ checkFor (t , 5 * time .Second , 100 * time .Millisecond , func () error {
4578
+ maxApplied = 0
4579
+ for _ , s := range c .servers {
4580
+ acc , err := s .lookupAccount (globalAccountName )
4581
+ if err != nil {
4582
+ return err
4583
+ }
4584
+ mset , err := acc .lookupStream ("TEST" )
4585
+ if err != nil {
4586
+ return err
4587
+ }
4588
+ o := mset .lookupConsumer ("CONSUMER" )
4589
+ if o == nil {
4590
+ return errors .New ("consumer not found" )
4591
+ }
4592
+ _ , _ , applied := o .node .Progress ()
4593
+ if maxApplied == 0 {
4594
+ maxApplied = applied
4595
+ } else if applied < maxApplied {
4596
+ return fmt .Errorf ("applied not high enough, expected %d, got %d" , applied , maxApplied )
4597
+ } else if applied > maxApplied {
4598
+ return fmt .Errorf ("applied higher on one server, expected %d, got %d" , applied , maxApplied )
4599
+ }
4600
+ }
4601
+ return nil
4602
+ })
4603
+
4604
+ // Install a snapshot on a follower.
4605
+ s := c .randomNonStreamLeader (globalAccountName , "TEST" )
4606
+ acc , err := s .lookupAccount (globalAccountName )
4607
+ require_NoError (t , err )
4608
+ mset , err := acc .lookupStream ("TEST" )
4609
+ require_NoError (t , err )
4610
+ o := mset .lookupConsumer ("CONSUMER" )
4611
+ require_NotNil (t , o )
4612
+ snapBytes , err := o .store .EncodedState ()
4613
+ require_NoError (t , err )
4614
+ err = o .node .InstallSnapshot (snapBytes )
4615
+ require_NoError (t , err )
4616
+
4617
+ // Validate the snapshot reflects applied.
4618
+ validateStreamState := func (snap * snapshot ) {
4619
+ t .Helper ()
4620
+ require_Equal (t , snap .lastIndex , maxApplied )
4621
+ state , err := decodeConsumerState (snap .data )
4622
+ require_NoError (t , err )
4623
+ require_Equal (t , state .Delivered .Consumer , 1 )
4624
+ require_Equal (t , state .Delivered .Stream , 1 )
4625
+ }
4626
+ snap , err := o .node .(* raft ).loadLastSnapshot ()
4627
+ require_NoError (t , err )
4628
+ validateStreamState (snap )
4629
+
4630
+ // Simulate a message being delivered, but not calling Applied yet.
4631
+ err = o .store .UpdateDelivered (2 , 2 , 1 , time .Now ().UnixNano ())
4632
+ require_NoError (t , err )
4633
+
4634
+ // Simulate the consumer being stopped before we're able to call Applied.
4635
+ // If we'd install a snapshot during this, which would be a race condition,
4636
+ // we'd store a snapshot with state that's ahead of applied.
4637
+ err = o .stop ()
4638
+ require_NoError (t , err )
4639
+
4640
+ // Validate the snapshot is the same as before.
4641
+ snap , err = o .node .(* raft ).loadLastSnapshot ()
4642
+ require_NoError (t , err )
4643
+ validateStreamState (snap )
4644
+ }
0 commit comments