Skip to content

Commit 273bd5a

Browse files
[FIXED] Stream peers drift after server peer-remove
Signed-off-by: Maurice van Veen <[email protected]> Co-authored-by: Neil Twigg <[email protected]>
1 parent 98afa2b commit 273bd5a

File tree

2 files changed

+123
-0
lines changed

2 files changed

+123
-0
lines changed

server/jetstream_cluster.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2037,7 +2037,24 @@ retry:
20372037
samePeers = slices.Equal(groupPeerIDs, nodePeerIDs)
20382038
}
20392039
if !samePeers {
2040+
// At this point we have no way of knowing:
2041+
// 1. Whether the group has lost enough nodes to cause a quorum
2042+
// loss, in which case a proposal may fail, therefore we will
2043+
// force a peerstate write;
2044+
// 2. Whether nodes in the group have other applies queued up
2045+
// that could change the peerstate again, therefore the leader
2046+
// should send out a new proposal anyway too just to make sure
2047+
// that this change gets captured in the log.
20402048
node.UpdateKnownPeers(groupPeerIDs)
2049+
2050+
// If the peers changed as a result of an update by the meta layer, we must reflect that in the log of
2051+
// this group. Otherwise, a new peer would come up and instantly reset the peer state back to whatever is
2052+
// in the log at that time, overwriting what the meta layer told it.
2053+
// Will need to address this properly later on, by for example having the meta layer decide the new
2054+
// placement, but have the leader of this group propose it through its own log instead.
2055+
if node.Leader() {
2056+
node.ProposeKnownPeers(groupPeerIDs)
2057+
}
20412058
}
20422059
rg.node = node
20432060
js.mu.Unlock()

server/jetstream_cluster_4_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5088,3 +5088,109 @@ func TestJetStreamClusterTTLAndDedupe(t *testing.T) {
50885088
_, err = js.PublishMsg(m)
50895089
require_NoError(t, err)
50905090
}
5091+
5092+
func TestJetStreamClusterServerPeerRemovePeersDrift(t *testing.T) {
5093+
c := createJetStreamClusterExplicit(t, "R4S", 4)
5094+
defer c.shutdown()
5095+
5096+
nc, js := jsClientConnect(t, c.randomServer())
5097+
defer nc.Close()
5098+
5099+
_, err := js.AddStream(&nats.StreamConfig{
5100+
Name: "TEST",
5101+
Retention: nats.LimitsPolicy,
5102+
Subjects: []string{"foo"},
5103+
Replicas: 3,
5104+
})
5105+
require_NoError(t, err)
5106+
5107+
var acc *Account
5108+
var mset *stream
5109+
5110+
// Wait for 3 of the 4 servers to have created the stream.
5111+
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
5112+
count := 0
5113+
for _, s := range c.servers {
5114+
acc, err = s.lookupAccount(globalAccountName)
5115+
if err != nil {
5116+
return err
5117+
}
5118+
_, err = acc.lookupStream("TEST")
5119+
if err != nil {
5120+
continue
5121+
}
5122+
count++
5123+
}
5124+
if count != 3 {
5125+
return fmt.Errorf("expected 3 streams, got: %d", count)
5126+
}
5127+
return nil
5128+
})
5129+
5130+
sl := c.streamLeader(globalAccountName, "TEST")
5131+
5132+
// Get a random server that:
5133+
// - is not stream leader
5134+
// - is not meta leader (peer-removing the meta leader has other issues)
5135+
// - already hosts the stream so a peer-remove results in changing the stream peer set
5136+
var rs *Server
5137+
for _, s := range c.servers {
5138+
acc, err = s.lookupAccount(globalAccountName)
5139+
require_NoError(t, err)
5140+
_, err = acc.lookupStream("TEST")
5141+
if s == sl || s.isMetaLeader.Load() || err != nil {
5142+
continue
5143+
}
5144+
rs = s
5145+
break
5146+
}
5147+
if rs == nil {
5148+
t.Fatal("No server found that's not either stream or meta leader.")
5149+
}
5150+
rs.Shutdown()
5151+
5152+
// Peer-remove the selected server so the stream moves to the remaining empty server.
5153+
sc, err := nats.Connect(sl.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
5154+
require_NoError(t, err)
5155+
req := &JSApiMetaServerRemoveRequest{Server: rs.Name()}
5156+
jsreq, err := json.Marshal(req)
5157+
require_NoError(t, err)
5158+
_, err = sc.Request(JSApiRemoveServer, jsreq, time.Second)
5159+
require_NoError(t, err)
5160+
5161+
// Eventually there should again be a R3 stream and everyone should agree on the peers.
5162+
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
5163+
count := 0
5164+
var ps []string
5165+
for _, s := range c.servers {
5166+
if s == rs {
5167+
continue
5168+
}
5169+
acc, err = s.lookupAccount(globalAccountName)
5170+
if err != nil {
5171+
return err
5172+
}
5173+
mset, err = acc.lookupStream("TEST")
5174+
if err != nil {
5175+
return err
5176+
}
5177+
rn := mset.raftNode().(*raft)
5178+
rn.RLock()
5179+
peerNames := rn.peerNames()
5180+
rn.RUnlock()
5181+
slices.Sort(peerNames)
5182+
if count == 0 {
5183+
ps = peerNames
5184+
} else if !slices.Equal(ps, peerNames) {
5185+
rsid := rs.NodeName()
5186+
containsOld := slices.Contains(ps, rsid) || slices.Contains(peerNames, rsid)
5187+
return fmt.Errorf("no equal peers, expected: %v, got: %v, contains old peer (%s): %v", ps, peerNames, rsid, containsOld)
5188+
}
5189+
count++
5190+
}
5191+
if count != 3 {
5192+
return fmt.Errorf("expected 3 servers hosting stream, got: %d", count)
5193+
}
5194+
return nil
5195+
})
5196+
}

0 commit comments

Comments
 (0)