@@ -7387,6 +7387,89 @@ func TestJetStreamClusterStreamHealthCheckMustNotDeleteEarly(t *testing.T) {
7387
7387
require_Equal (t , node .State (), Follower )
7388
7388
}
7389
7389
7390
+ func TestJetStreamClusterStreamHealthCheckOnlyReportsSkew (t * testing.T ) {
7391
+ c := createJetStreamClusterExplicit (t , "R3S" , 3 )
7392
+ defer c .shutdown ()
7393
+
7394
+ nc , js := jsClientConnect (t , c .randomServer ())
7395
+ defer nc .Close ()
7396
+
7397
+ waitForStreamAssignments := func () {
7398
+ t .Helper ()
7399
+ checkFor (t , 5 * time .Second , time .Second , func () error {
7400
+ for _ , s := range c .servers {
7401
+ js := s .getJetStream ()
7402
+ js .mu .RLock ()
7403
+ sa := js .streamAssignment (globalAccountName , "TEST" )
7404
+ js .mu .RUnlock ()
7405
+ if sa == nil {
7406
+ return fmt .Errorf ("stream assignment not found on %s" , s .Name ())
7407
+ }
7408
+ }
7409
+ return nil
7410
+ })
7411
+ }
7412
+ getStreamAssignment := func (rs * Server ) (* jetStream , * Account , * streamAssignment , * stream ) {
7413
+ acc , err := rs .lookupAccount (globalAccountName )
7414
+ require_NoError (t , err )
7415
+ mset , err := acc .lookupStream ("TEST" )
7416
+ require_NotNil (t , err )
7417
+
7418
+ sjs := rs .getJetStream ()
7419
+ sjs .mu .RLock ()
7420
+ defer sjs .mu .RUnlock ()
7421
+
7422
+ sas := sjs .cluster .streams [globalAccountName ]
7423
+ require_True (t , sas != nil )
7424
+ sa := sas ["TEST" ]
7425
+ require_True (t , sa != nil )
7426
+ sa .Created = time.Time {}
7427
+ return sjs , acc , sa , mset
7428
+ }
7429
+
7430
+ _ , err := js .AddStream (& nats.StreamConfig {
7431
+ Name : "TEST" ,
7432
+ Subjects : []string {"foo" },
7433
+ Replicas : 3 ,
7434
+ })
7435
+ require_NoError (t , err )
7436
+ waitForStreamAssignments ()
7437
+
7438
+ // Confirm the stream and assignment Raft nodes are equal.
7439
+ rs := c .randomNonStreamLeader (globalAccountName , "TEST" )
7440
+ sjs , acc , sa , mset := getStreamAssignment (rs )
7441
+ mset .mu .RLock ()
7442
+ msetNode := mset .node
7443
+ mset .mu .RUnlock ()
7444
+ sjs .mu .Lock ()
7445
+ group := sa .Group
7446
+ if group == nil {
7447
+ sjs .mu .Unlock ()
7448
+ t .Fatal ("sa.Group not initialized" )
7449
+ }
7450
+ node := group .node
7451
+ if node == nil {
7452
+ sjs .mu .Unlock ()
7453
+ t .Fatal ("sa.Group.node not initialized" )
7454
+ }
7455
+ sjs .mu .Unlock ()
7456
+ require_Equal (t , msetNode , node )
7457
+
7458
+ // Simulate stopping and restarting a new instance.
7459
+ node .Stop ()
7460
+ node .WaitForStop ()
7461
+ node , err = sjs .createRaftGroup (globalAccountName , group , FileStorage , pprofLabels {})
7462
+ require_NoError (t , err )
7463
+ require_NotEqual (t , node .State (), Closed )
7464
+
7465
+ // The health check gets the Raft node of the assignment and checks it against the
7466
+ // Raft node of the stream. We simulate a race condition where the assignment's Raft node
7467
+ // is re-newed, but the stream's node is still the old instance.
7468
+ // The health check MUST NOT delete the node.
7469
+ require_Error (t , sjs .isStreamHealthy (acc , sa ), errors .New ("cluster node skew detected" ))
7470
+ require_NotEqual (t , node .State (), Closed )
7471
+ }
7472
+
7390
7473
func TestJetStreamClusterConsumerHealthCheckMustNotRecreate (t * testing.T ) {
7391
7474
c := createJetStreamClusterExplicit (t , "R3S" , 3 )
7392
7475
defer c .shutdown ()
@@ -7481,7 +7564,7 @@ func TestJetStreamClusterConsumerHealthCheckMustNotRecreate(t *testing.T) {
7481
7564
ca .pending = true
7482
7565
sjs .mu .Unlock ()
7483
7566
sjs .isConsumerHealthy (mset , "CONSUMER" , ca )
7484
- require_False (t , ca .pending )
7567
+ require_True (t , ca .pending )
7485
7568
7486
7569
err = js .DeleteConsumer ("TEST" , "CONSUMER" )
7487
7570
require_NoError (t , err )
@@ -7582,6 +7665,91 @@ func TestJetStreamClusterConsumerHealthCheckMustNotDeleteEarly(t *testing.T) {
7582
7665
require_Equal (t , node .State (), Follower )
7583
7666
}
7584
7667
7668
+ func TestJetStreamClusterConsumerHealthCheckOnlyReportsSkew (t * testing.T ) {
7669
+ c := createJetStreamClusterExplicit (t , "R3S" , 3 )
7670
+ defer c .shutdown ()
7671
+
7672
+ nc , js := jsClientConnect (t , c .randomServer ())
7673
+ defer nc .Close ()
7674
+
7675
+ waitForConsumerAssignments := func () {
7676
+ t .Helper ()
7677
+ checkFor (t , 5 * time .Second , time .Second , func () error {
7678
+ for _ , s := range c .servers {
7679
+ if s .getJetStream ().consumerAssignment (globalAccountName , "TEST" , "CONSUMER" ) == nil {
7680
+ return fmt .Errorf ("stream assignment not found on %s" , s .Name ())
7681
+ }
7682
+ }
7683
+ return nil
7684
+ })
7685
+ }
7686
+ getConsumerAssignment := func (rs * Server ) (* jetStream , * consumerAssignment , * stream , * consumer ) {
7687
+ acc , err := rs .lookupAccount (globalAccountName )
7688
+ require_NoError (t , err )
7689
+ mset , err := acc .lookupStream ("TEST" )
7690
+ require_NotNil (t , err )
7691
+ o := mset .lookupConsumer ("CONSUMER" )
7692
+
7693
+ sjs := rs .getJetStream ()
7694
+ sjs .mu .RLock ()
7695
+ defer sjs .mu .RUnlock ()
7696
+
7697
+ sas := sjs .cluster .streams [globalAccountName ]
7698
+ require_True (t , sas != nil )
7699
+ sa := sas ["TEST" ]
7700
+ require_True (t , sa != nil )
7701
+ ca := sa .consumers ["CONSUMER" ]
7702
+ require_True (t , ca != nil )
7703
+ ca .Created = time.Time {}
7704
+ return sjs , ca , mset , o
7705
+ }
7706
+
7707
+ _ , err := js .AddStream (& nats.StreamConfig {
7708
+ Name : "TEST" ,
7709
+ Subjects : []string {"foo" },
7710
+ Replicas : 3 ,
7711
+ Retention : nats .InterestPolicy , // Replicated consumers by default
7712
+ })
7713
+ require_NoError (t , err )
7714
+ _ , err = js .AddConsumer ("TEST" , & nats.ConsumerConfig {Durable : "CONSUMER" })
7715
+ require_NoError (t , err )
7716
+ waitForConsumerAssignments ()
7717
+
7718
+ // Confirm the consumer and assignment Raft nodes are equal.
7719
+ rs := c .randomNonConsumerLeader (globalAccountName , "TEST" , "CONSUMER" )
7720
+ sjs , ca , mset , o := getConsumerAssignment (rs )
7721
+ o .mu .RLock ()
7722
+ oNode := o .node
7723
+ o .mu .RUnlock ()
7724
+ sjs .mu .Lock ()
7725
+ group := ca .Group
7726
+ if group == nil {
7727
+ sjs .mu .Unlock ()
7728
+ t .Fatal ("ca.Group not initialized" )
7729
+ }
7730
+ node := group .node
7731
+ if node == nil {
7732
+ sjs .mu .Unlock ()
7733
+ t .Fatal ("ca.Group.node not initialized" )
7734
+ }
7735
+ sjs .mu .Unlock ()
7736
+ require_Equal (t , oNode , node )
7737
+
7738
+ // Simulate stopping and restarting a new instance.
7739
+ node .Stop ()
7740
+ node .WaitForStop ()
7741
+ node , err = sjs .createRaftGroup (globalAccountName , group , FileStorage , pprofLabels {})
7742
+ require_NoError (t , err )
7743
+ require_NotEqual (t , node .State (), Closed )
7744
+
7745
+ // The health check gets the Raft node of the assignment and checks it against the
7746
+ // Raft node of the consumer. We simulate a race condition where the assignment's Raft node
7747
+ // is re-newed, but the consumer's node is still the old instance.
7748
+ // The health check MUST NOT delete the node.
7749
+ require_Error (t , sjs .isConsumerHealthy (mset , "CONSUMER" , ca ), errors .New ("cluster node skew detected" ))
7750
+ require_NotEqual (t , node .State (), Closed )
7751
+ }
7752
+
7585
7753
func TestJetStreamClusterRespectConsumerStartSeq (t * testing.T ) {
7586
7754
c := createJetStreamClusterExplicit (t , "R3S" , 3 )
7587
7755
defer c .shutdown ()
0 commit comments