Skip to content

[FIXED] Stream peers drift after server peer-remove #6720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2037,7 +2037,24 @@ retry:
samePeers = slices.Equal(groupPeerIDs, nodePeerIDs)
}
if !samePeers {
// At this point we have no way of knowing:
// 1. Whether the group has lost enough nodes to cause a quorum
// loss, in which case a proposal may fail, therefore we will
// force a peerstate write;
// 2. Whether nodes in the group have other applies queued up
// that could change the peerstate again, therefore the leader
// should send out a new proposal anyway too just to make sure
// that this change gets captured in the log.
node.UpdateKnownPeers(groupPeerIDs)

// If the peers changed as a result of an update by the meta layer, we must reflect that in the log of
// this group. Otherwise, a new peer would come up and instantly reset the peer state back to whatever is
// in the log at that time, overwriting what the meta layer told it.
// Will need to address this properly later on, by for example having the meta layer decide the new
// placement, but have the leader of this group propose it through its own log instead.
if node.Leader() {
node.ProposeKnownPeers(groupPeerIDs)
}
}
rg.node = node
js.mu.Unlock()
Expand Down
106 changes: 106 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5088,3 +5088,109 @@ func TestJetStreamClusterTTLAndDedupe(t *testing.T) {
_, err = js.PublishMsg(m)
require_NoError(t, err)
}

func TestJetStreamClusterServerPeerRemovePeersDrift(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R4S", 4)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Retention: nats.LimitsPolicy,
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

var acc *Account
var mset *stream

// Wait for 3 of the 4 servers to have created the stream.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
count := 0
for _, s := range c.servers {
acc, err = s.lookupAccount(globalAccountName)
if err != nil {
return err
}
_, err = acc.lookupStream("TEST")
if err != nil {
continue
}
count++
}
if count != 3 {
return fmt.Errorf("expected 3 streams, got: %d", count)
}
return nil
})

sl := c.streamLeader(globalAccountName, "TEST")

// Get a random server that:
// - is not stream leader
// - is not meta leader (peer-removing the meta leader has other issues)
// - already hosts the stream so a peer-remove results in changing the stream peer set
var rs *Server
for _, s := range c.servers {
acc, err = s.lookupAccount(globalAccountName)
require_NoError(t, err)
_, err = acc.lookupStream("TEST")
if s == sl || s.isMetaLeader.Load() || err != nil {
continue
}
rs = s
break
}
if rs == nil {
t.Fatal("No server found that's not either stream or meta leader.")
}
rs.Shutdown()

// Peer-remove the selected server so the stream moves to the remaining empty server.
sc, err := nats.Connect(sl.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
req := &JSApiMetaServerRemoveRequest{Server: rs.Name()}
jsreq, err := json.Marshal(req)
require_NoError(t, err)
_, err = sc.Request(JSApiRemoveServer, jsreq, time.Second)
require_NoError(t, err)

// Eventually there should again be a R3 stream and everyone should agree on the peers.
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
count := 0
var ps []string
for _, s := range c.servers {
if s == rs {
continue
}
acc, err = s.lookupAccount(globalAccountName)
if err != nil {
return err
}
mset, err = acc.lookupStream("TEST")
if err != nil {
return err
}
rn := mset.raftNode().(*raft)
rn.RLock()
peerNames := rn.peerNames()
rn.RUnlock()
slices.Sort(peerNames)
if count == 0 {
ps = peerNames
} else if !slices.Equal(ps, peerNames) {
rsid := rs.NodeName()
containsOld := slices.Contains(ps, rsid) || slices.Contains(peerNames, rsid)
return fmt.Errorf("no equal peers, expected: %v, got: %v, contains old peer (%s): %v", ps, peerNames, rsid, containsOld)
}
count++
}
if count != 3 {
return fmt.Errorf("expected 3 servers hosting stream, got: %d", count)
}
return nil
})
}
Loading