Skip to content

[FIXED] Server peer re-add after peer-remove #6815

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3776,6 +3776,89 @@ func TestJetStreamClusterPeerRemovalAndStreamReassignmentWithoutSpace(t *testing
streamCurrent(2)
}

func TestJetStreamClusterPeerRemovalAndServerBroughtBack(t *testing.T) {
// Speed up for this test
peerRemoveTimeout = 2 * time.Second
defer func() {
peerRemoveTimeout = peerRemoveTimeoutDefault
}()

c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()

// Client based API
ml := c.leader()
nc, err := nats.Connect(ml.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
if err != nil {
t.Fatalf("Failed to create system client: %v", err)
}
defer nc.Close()

getPeersCount := func() int {
js := ml.getJetStream()
if js == nil {
return 0
}
js.mu.RLock()
defer js.mu.RUnlock()

cc := js.cluster
if !cc.isLeader() || cc.meta == nil {
return 0
}
return len(cc.meta.Peers())
}

checkFor(t, 2*time.Second, 250*time.Millisecond, func() error {
if l := getPeersCount(); l != 5 {
return fmt.Errorf("expected 5 peers, got %d", l)
}
return nil
})

// Shutdown server first.
rs := c.randomNonLeader()
rs.Shutdown()

// Peers should still remain the same, even if one server is shut down.
checkFor(t, 2*time.Second, 250*time.Millisecond, func() error {
if l := getPeersCount(); l != 5 {
return fmt.Errorf("expected 5 peers, got %d", l)
}
return nil
})

// Peer-remove after shutdown.
req := &JSApiMetaServerRemoveRequest{Server: rs.Name()}
jsreq, err := json.Marshal(req)
require_NoError(t, err)
rmsg, err := nc.Request(JSApiRemoveServer, jsreq, time.Second)
require_NoError(t, err)

var resp JSApiMetaServerRemoveResponse
require_NoError(t, json.Unmarshal(rmsg.Data, &resp))
if resp.Error != nil {
t.Fatalf("Unexpected error: %+v", resp.Error)
}

// Peer should be removed.
checkFor(t, 2*time.Second, 250*time.Millisecond, func() error {
if l := getPeersCount(); l != 4 {
return fmt.Errorf("expected 4 peers, got %d", l)
}
return nil
})

// Bringing back the server should re-add to peers after peer-remove timeout.
c.restartServer(rs)
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
if l := getPeersCount(); l != 5 {
return fmt.Errorf("expected 5 peers, got %d", l)
}
return nil
})
}

func TestJetStreamClusterPeerExclusionTag(t *testing.T) {
c := createJetStreamClusterWithTemplateAndModHook(t, jsClusterTempl, "C", 3,
func(serverName, clusterName, storeDir, conf string) string {
Expand Down
19 changes: 13 additions & 6 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type raft struct {
qn int // Number of nodes needed to establish quorum
peers map[string]*lps // Other peers in the Raft group

removed map[string]struct{} // Peers that were removed from the group
removed map[string]time.Time // Peers that were removed from the group
acks map[uint64]map[string]struct{} // Append entry responses/acks, map of entry index -> peer ID
pae map[uint64]*appendEntry // Pending append entries

Expand Down Expand Up @@ -253,6 +253,7 @@ const (
lostQuorumIntervalDefault = hbIntervalDefault * 10 // 10 seconds
lostQuorumCheckIntervalDefault = hbIntervalDefault * 10 // 10 seconds
observerModeIntervalDefault = 48 * time.Hour
peerRemoveTimeoutDefault = 5 * time.Minute
)

var (
Expand All @@ -264,6 +265,7 @@ var (
lostQuorumInterval = lostQuorumIntervalDefault
lostQuorumCheck = lostQuorumCheckIntervalDefault
observerModeInterval = observerModeIntervalDefault
peerRemoveTimeout = peerRemoveTimeoutDefault
)

type RaftConfig struct {
Expand Down Expand Up @@ -892,9 +894,9 @@ func (n *raft) ProposeAddPeer(peer string) error {
func (n *raft) doRemovePeerAsLeader(peer string) {
n.Lock()
if n.removed == nil {
n.removed = map[string]struct{}{}
n.removed = map[string]time.Time{}
}
n.removed[peer] = struct{}{}
n.removed[peer] = time.Now()
if _, ok := n.peers[peer]; ok {
delete(n.peers, peer)
// We should decrease our cluster size since we are tracking this peer and the peer is most likely already gone.
Expand Down Expand Up @@ -2968,9 +2970,9 @@ func (n *raft) applyCommit(index uint64) error {

// Make sure we have our removed map.
if n.removed == nil {
n.removed = make(map[string]struct{})
n.removed = make(map[string]time.Time)
}
n.removed[peer] = struct{}{}
n.removed[peer] = time.Now()

if _, ok := n.peers[peer]; ok {
delete(n.peers, peer)
Expand Down Expand Up @@ -3085,8 +3087,13 @@ func (n *raft) adjustClusterSizeAndQuorum() {
func (n *raft) trackPeer(peer string) error {
n.Lock()
var needPeerAdd, isRemoved bool
var rts time.Time
if n.removed != nil {
_, isRemoved = n.removed[peer]
rts, isRemoved = n.removed[peer]
// Removed peers can rejoin after timeout.
if isRemoved && time.Since(rts) >= peerRemoveTimeout {
isRemoved = false
}
}
if n.State() == Leader {
if lp, ok := n.peers[peer]; !ok || !lp.kp {
Expand Down