diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index aa2bdaef0fd..5ef1d7c909d 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -3776,6 +3776,89 @@ func TestJetStreamClusterPeerRemovalAndStreamReassignmentWithoutSpace(t *testing streamCurrent(2) } +func TestJetStreamClusterPeerRemovalAndServerBroughtBack(t *testing.T) { + // Speed up for this test + peerRemoveTimeout = 2 * time.Second + defer func() { + peerRemoveTimeout = peerRemoveTimeoutDefault + }() + + c := createJetStreamClusterExplicit(t, "R5S", 5) + defer c.shutdown() + + // Client based API + ml := c.leader() + nc, err := nats.Connect(ml.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + if err != nil { + t.Fatalf("Failed to create system client: %v", err) + } + defer nc.Close() + + getPeersCount := func() int { + js := ml.getJetStream() + if js == nil { + return 0 + } + js.mu.RLock() + defer js.mu.RUnlock() + + cc := js.cluster + if !cc.isLeader() || cc.meta == nil { + return 0 + } + return len(cc.meta.Peers()) + } + + checkFor(t, 2*time.Second, 250*time.Millisecond, func() error { + if l := getPeersCount(); l != 5 { + return fmt.Errorf("expected 5 peers, got %d", l) + } + return nil + }) + + // Shutdown server first. + rs := c.randomNonLeader() + rs.Shutdown() + + // Peers should still remain the same, even if one server is shut down. + checkFor(t, 2*time.Second, 250*time.Millisecond, func() error { + if l := getPeersCount(); l != 5 { + return fmt.Errorf("expected 5 peers, got %d", l) + } + return nil + }) + + // Peer-remove after shutdown. + req := &JSApiMetaServerRemoveRequest{Server: rs.Name()} + jsreq, err := json.Marshal(req) + require_NoError(t, err) + rmsg, err := nc.Request(JSApiRemoveServer, jsreq, time.Second) + require_NoError(t, err) + + var resp JSApiMetaServerRemoveResponse + require_NoError(t, json.Unmarshal(rmsg.Data, &resp)) + if resp.Error != nil { + t.Fatalf("Unexpected error: %+v", resp.Error) + } + + // Peer should be removed. + checkFor(t, 2*time.Second, 250*time.Millisecond, func() error { + if l := getPeersCount(); l != 4 { + return fmt.Errorf("expected 4 peers, got %d", l) + } + return nil + }) + + // Bringing back the server should re-add to peers after peer-remove timeout. + c.restartServer(rs) + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + if l := getPeersCount(); l != 5 { + return fmt.Errorf("expected 5 peers, got %d", l) + } + return nil + }) +} + func TestJetStreamClusterPeerExclusionTag(t *testing.T) { c := createJetStreamClusterWithTemplateAndModHook(t, jsClusterTempl, "C", 3, func(serverName, clusterName, storeDir, conf string) string { diff --git a/server/raft.go b/server/raft.go index 16a1b3cd720..95351ac5b87 100644 --- a/server/raft.go +++ b/server/raft.go @@ -154,7 +154,7 @@ type raft struct { qn int // Number of nodes needed to establish quorum peers map[string]*lps // Other peers in the Raft group - removed map[string]struct{} // Peers that were removed from the group + removed map[string]time.Time // Peers that were removed from the group acks map[uint64]map[string]struct{} // Append entry responses/acks, map of entry index -> peer ID pae map[uint64]*appendEntry // Pending append entries @@ -253,6 +253,7 @@ const ( lostQuorumIntervalDefault = hbIntervalDefault * 10 // 10 seconds lostQuorumCheckIntervalDefault = hbIntervalDefault * 10 // 10 seconds observerModeIntervalDefault = 48 * time.Hour + peerRemoveTimeoutDefault = 5 * time.Minute ) var ( @@ -264,6 +265,7 @@ var ( lostQuorumInterval = lostQuorumIntervalDefault lostQuorumCheck = lostQuorumCheckIntervalDefault observerModeInterval = observerModeIntervalDefault + peerRemoveTimeout = peerRemoveTimeoutDefault ) type RaftConfig struct { @@ -892,9 +894,9 @@ func (n *raft) ProposeAddPeer(peer string) error { func (n *raft) doRemovePeerAsLeader(peer string) { n.Lock() if n.removed == nil { - n.removed = map[string]struct{}{} + n.removed = map[string]time.Time{} } - n.removed[peer] = struct{}{} + n.removed[peer] = time.Now() if _, ok := n.peers[peer]; ok { delete(n.peers, peer) // We should decrease our cluster size since we are tracking this peer and the peer is most likely already gone. @@ -2968,9 +2970,9 @@ func (n *raft) applyCommit(index uint64) error { // Make sure we have our removed map. if n.removed == nil { - n.removed = make(map[string]struct{}) + n.removed = make(map[string]time.Time) } - n.removed[peer] = struct{}{} + n.removed[peer] = time.Now() if _, ok := n.peers[peer]; ok { delete(n.peers, peer) @@ -3085,8 +3087,13 @@ func (n *raft) adjustClusterSizeAndQuorum() { func (n *raft) trackPeer(peer string) error { n.Lock() var needPeerAdd, isRemoved bool + var rts time.Time if n.removed != nil { - _, isRemoved = n.removed[peer] + rts, isRemoved = n.removed[peer] + // Removed peers can rejoin after timeout. + if isRemoved && time.Since(rts) >= peerRemoveTimeout { + isRemoved = false + } } if n.State() == Leader { if lp, ok := n.peers[peer]; !ok || !lp.kp {