Skip to content

Commit 2dc32c1

Browse files
[FIXED] Peer removal race (#6316)
There was a race with keeping peers up-to-date. When updating replicas of a stream up and down rapidly enough the peer state could desync since the update is handled by the meta layer, but the peer removal is handled by the stream. If the meta layer agreed on scaling the stream up to 5 replicas, but a peer removal came in after from an earlier downscale, then a server would end up with an incomplete peer list. Since peer changes are done by the meta layer we can/should trust that does the right thing, and we don't need to fiddle by proposing peer removals ourselves. Also, since the meta layer handles stream creates/updates and calls into `UpdateKnownPeers` already, we shouldn't then propose the peers (again) through the stream layer. Signed-off-by: Maurice van Veen <[email protected]>
2 parents afb7e0b + cd7fb62 commit 2dc32c1

File tree

3 files changed

+70
-36
lines changed

3 files changed

+70
-36
lines changed

server/jetstream_cluster.go

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2721,8 +2721,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
27212721
// keep stream assignment current
27222722
sa = mset.streamAssignment()
27232723

2724-
// keep peer list up to date with config
2725-
js.checkPeers(mset.raftGroup())
27262724
// We get this when we have a new stream assignment caused by an update.
27272725
// We want to know if we are migrating.
27282726
if migrating := mset.isMigrating(); migrating {
@@ -2810,7 +2808,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
28102808
// Check if we have a quorom.
28112809
if current >= neededCurrent {
28122810
s.Noticef("Transfer of stream leader for '%s > %s' to '%s'", accName, sa.Config.Name, newLeader)
2813-
n.UpdateKnownPeers(newPeers)
2811+
n.ProposeKnownPeers(newPeers)
28142812
n.StepDown(newLeaderPeer)
28152813
}
28162814
}
@@ -3345,22 +3343,6 @@ func (s *Server) replicas(node RaftNode) []*PeerInfo {
33453343
return replicas
33463344
}
33473345

3348-
// Will check our node peers and see if we should remove a peer.
3349-
func (js *jetStream) checkPeers(rg *raftGroup) {
3350-
js.mu.Lock()
3351-
defer js.mu.Unlock()
3352-
3353-
// FIXME(dlc) - Single replicas?
3354-
if rg == nil || rg.node == nil {
3355-
return
3356-
}
3357-
for _, peer := range rg.node.Peers() {
3358-
if !rg.isMember(peer.ID) {
3359-
rg.node.ProposeRemovePeer(peer.ID)
3360-
}
3361-
}
3362-
}
3363-
33643346
// Process a leader change for the clustered stream.
33653347
func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
33663348
if mset == nil {
@@ -3393,8 +3375,6 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
33933375
if isLeader {
33943376
s.Noticef("JetStream cluster new stream leader for '%s > %s'", account, streamName)
33953377
s.sendStreamLeaderElectAdvisory(mset)
3396-
// Check for peer removal and process here if needed.
3397-
js.checkPeers(sa.Group)
33983378
mset.checkAllowMsgCompress(peers)
33993379
} else {
34003380
// We are stepping down.
@@ -3611,7 +3591,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool {
36113591
js.processClusterCreateStream(acc, sa)
36123592
} else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
36133593
// We have one here even though we are not a member. This can happen on re-assignment.
3614-
s.removeStream(ourID, mset, sa)
3594+
s.removeStream(mset, sa)
36153595
}
36163596

36173597
// If this stream assignment does not have a sync subject (bug) set that the meta-leader should check when elected.
@@ -3699,13 +3679,13 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
36993679
js.processClusterUpdateStream(acc, osa, sa)
37003680
} else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
37013681
// We have one here even though we are not a member. This can happen on re-assignment.
3702-
s.removeStream(ourID, mset, sa)
3682+
s.removeStream(mset, sa)
37033683
}
37043684
}
37053685

3706-
// Common function to remove ourself from this server.
3686+
// Common function to remove ourselves from this server.
37073687
// This can happen on re-assignment, move, etc
3708-
func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment) {
3688+
func (s *Server) removeStream(mset *stream, nsa *streamAssignment) {
37093689
if mset == nil {
37103690
return
37113691
}
@@ -3715,7 +3695,6 @@ func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment)
37153695
if node.Leader() {
37163696
node.StepDown(nsa.Group.Preferred)
37173697
}
3718-
node.ProposeRemovePeer(ourID)
37193698
// shutdown monitor by shutting down raft.
37203699
node.Delete()
37213700
}
@@ -5051,8 +5030,6 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
50515030
// We get this when we have a new consumer assignment caused by an update.
50525031
// We want to know if we are migrating.
50535032
rg := o.raftGroup()
5054-
// keep peer list up to date with config
5055-
js.checkPeers(rg)
50565033
// If we are migrating, monitor for the new peers to be caught up.
50575034
replicas, err := o.replica()
50585035
if err != nil {
@@ -5369,8 +5346,6 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err
53695346
if isLeader {
53705347
s.Noticef("JetStream cluster new consumer leader for '%s > %s > %s'", ca.Client.serviceAccount(), streamName, consumerName)
53715348
s.sendConsumerLeaderElectAdvisory(o)
5372-
// Check for peer removal and process here if needed.
5373-
js.checkPeers(ca.Group)
53745349
} else {
53755350
// We are stepping down.
53765351
// Make sure if we are doing so because we have lost quorum that we send the appropriate advisories.

server/jetstream_cluster_1_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6945,6 +6945,60 @@ func TestJetStreamClusterConsumerInfoAfterCreate(t *testing.T) {
69456945
require_NoError(t, err)
69466946
}
69476947

6948+
func TestJetStreamClusterStreamUpscalePeersAfterDownscale(t *testing.T) {
6949+
c := createJetStreamClusterExplicit(t, "R5S", 5)
6950+
defer c.shutdown()
6951+
6952+
nc, js := jsClientConnect(t, c.randomServer())
6953+
defer nc.Close()
6954+
6955+
checkPeerSet := func() {
6956+
t.Helper()
6957+
checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
6958+
for _, s := range c.servers {
6959+
acc, err := s.lookupAccount(globalAccountName)
6960+
if err != nil {
6961+
return err
6962+
}
6963+
mset, err := acc.lookupStream("TEST")
6964+
if err != nil {
6965+
return err
6966+
}
6967+
peers := mset.raftNode().Peers()
6968+
if len(peers) != 5 {
6969+
return fmt.Errorf("expected 5 peers, got %d", len(peers))
6970+
}
6971+
}
6972+
return nil
6973+
})
6974+
}
6975+
6976+
_, err := js.AddStream(&nats.StreamConfig{
6977+
Name: "TEST",
6978+
Subjects: []string{"foo"},
6979+
Replicas: 5,
6980+
})
6981+
require_NoError(t, err)
6982+
6983+
checkPeerSet()
6984+
6985+
_, err = js.UpdateStream(&nats.StreamConfig{
6986+
Name: "TEST",
6987+
Subjects: []string{"foo"},
6988+
Replicas: 3,
6989+
})
6990+
require_NoError(t, err)
6991+
6992+
_, err = js.UpdateStream(&nats.StreamConfig{
6993+
Name: "TEST",
6994+
Subjects: []string{"foo"},
6995+
Replicas: 5,
6996+
})
6997+
require_NoError(t, err)
6998+
6999+
checkPeerSet()
7000+
}
7001+
69487002
//
69497003
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
69507004
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.

server/raft.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type RaftNode interface {
6161
ID() string
6262
Group() string
6363
Peers() []*Peer
64+
ProposeKnownPeers(knownPeers []string)
6465
UpdateKnownPeers(knownPeers []string)
6566
ProposeAddPeer(peer string) error
6667
ProposeRemovePeer(peer string) error
@@ -1699,19 +1700,23 @@ func (n *raft) Peers() []*Peer {
16991700
return peers
17001701
}
17011702

1703+
// Update and propose our known set of peers.
1704+
func (n *raft) ProposeKnownPeers(knownPeers []string) {
1705+
// If we are the leader update and send this update out.
1706+
if n.State() != Leader {
1707+
return
1708+
}
1709+
n.UpdateKnownPeers(knownPeers)
1710+
n.sendPeerState()
1711+
}
1712+
17021713
// Update our known set of peers.
17031714
func (n *raft) UpdateKnownPeers(knownPeers []string) {
17041715
n.Lock()
17051716
// Process like peer state update.
17061717
ps := &peerState{knownPeers, len(knownPeers), n.extSt}
17071718
n.processPeerState(ps)
1708-
isLeader := n.State() == Leader
17091719
n.Unlock()
1710-
1711-
// If we are the leader send this update out as well.
1712-
if isLeader {
1713-
n.sendPeerState()
1714-
}
17151720
}
17161721

17171722
// ApplyQ returns the apply queue that new commits will be sent to for the

0 commit comments

Comments
 (0)