Skip to content

Commit fd422dd

Browse files
[FIXED] Stream peers drift after server peer-remove (#6720)
If a server is peer-removed, a new peer will be selected if the server hosted a stream. That new peer would be given the new peer set by the meta layer, but would then reset back to the old peer set as part of the stream's log. This is a short-term fix, proposing the new peer set through the stream's log to ensure all replicas agree on it. Later we'll need to rework some parts to ensure the meta layer only 'suggests' new changes, but the proposing actually goes through the stream/consumer's log instead of meta's log. Signed-off-by: Maurice van Veen <[email protected]> Co-authored-by: Neil Twigg <[email protected]>
2 parents 693a27d + 273bd5a commit fd422dd

File tree

2 files changed

+123
-0
lines changed

2 files changed

+123
-0
lines changed

server/jetstream_cluster.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2037,7 +2037,24 @@ retry:
20372037
samePeers = slices.Equal(groupPeerIDs, nodePeerIDs)
20382038
}
20392039
if !samePeers {
2040+
// At this point we have no way of knowing:
2041+
// 1. Whether the group has lost enough nodes to cause a quorum
2042+
// loss, in which case a proposal may fail, therefore we will
2043+
// force a peerstate write;
2044+
// 2. Whether nodes in the group have other applies queued up
2045+
// that could change the peerstate again, therefore the leader
2046+
// should send out a new proposal anyway too just to make sure
2047+
// that this change gets captured in the log.
20402048
node.UpdateKnownPeers(groupPeerIDs)
2049+
2050+
// If the peers changed as a result of an update by the meta layer, we must reflect that in the log of
2051+
// this group. Otherwise, a new peer would come up and instantly reset the peer state back to whatever is
2052+
// in the log at that time, overwriting what the meta layer told it.
2053+
// Will need to address this properly later on, by for example having the meta layer decide the new
2054+
// placement, but have the leader of this group propose it through its own log instead.
2055+
if node.Leader() {
2056+
node.ProposeKnownPeers(groupPeerIDs)
2057+
}
20412058
}
20422059
rg.node = node
20432060
js.mu.Unlock()

server/jetstream_cluster_4_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5088,3 +5088,109 @@ func TestJetStreamClusterTTLAndDedupe(t *testing.T) {
50885088
_, err = js.PublishMsg(m)
50895089
require_NoError(t, err)
50905090
}
5091+
5092+
func TestJetStreamClusterServerPeerRemovePeersDrift(t *testing.T) {
5093+
c := createJetStreamClusterExplicit(t, "R4S", 4)
5094+
defer c.shutdown()
5095+
5096+
nc, js := jsClientConnect(t, c.randomServer())
5097+
defer nc.Close()
5098+
5099+
_, err := js.AddStream(&nats.StreamConfig{
5100+
Name: "TEST",
5101+
Retention: nats.LimitsPolicy,
5102+
Subjects: []string{"foo"},
5103+
Replicas: 3,
5104+
})
5105+
require_NoError(t, err)
5106+
5107+
var acc *Account
5108+
var mset *stream
5109+
5110+
// Wait for 3 of the 4 servers to have created the stream.
5111+
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
5112+
count := 0
5113+
for _, s := range c.servers {
5114+
acc, err = s.lookupAccount(globalAccountName)
5115+
if err != nil {
5116+
return err
5117+
}
5118+
_, err = acc.lookupStream("TEST")
5119+
if err != nil {
5120+
continue
5121+
}
5122+
count++
5123+
}
5124+
if count != 3 {
5125+
return fmt.Errorf("expected 3 streams, got: %d", count)
5126+
}
5127+
return nil
5128+
})
5129+
5130+
sl := c.streamLeader(globalAccountName, "TEST")
5131+
5132+
// Get a random server that:
5133+
// - is not stream leader
5134+
// - is not meta leader (peer-removing the meta leader has other issues)
5135+
// - already hosts the stream so a peer-remove results in changing the stream peer set
5136+
var rs *Server
5137+
for _, s := range c.servers {
5138+
acc, err = s.lookupAccount(globalAccountName)
5139+
require_NoError(t, err)
5140+
_, err = acc.lookupStream("TEST")
5141+
if s == sl || s.isMetaLeader.Load() || err != nil {
5142+
continue
5143+
}
5144+
rs = s
5145+
break
5146+
}
5147+
if rs == nil {
5148+
t.Fatal("No server found that's not either stream or meta leader.")
5149+
}
5150+
rs.Shutdown()
5151+
5152+
// Peer-remove the selected server so the stream moves to the remaining empty server.
5153+
sc, err := nats.Connect(sl.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
5154+
require_NoError(t, err)
5155+
req := &JSApiMetaServerRemoveRequest{Server: rs.Name()}
5156+
jsreq, err := json.Marshal(req)
5157+
require_NoError(t, err)
5158+
_, err = sc.Request(JSApiRemoveServer, jsreq, time.Second)
5159+
require_NoError(t, err)
5160+
5161+
// Eventually there should again be a R3 stream and everyone should agree on the peers.
5162+
checkFor(t, 2*time.Second, 100*time.Millisecond, func() error {
5163+
count := 0
5164+
var ps []string
5165+
for _, s := range c.servers {
5166+
if s == rs {
5167+
continue
5168+
}
5169+
acc, err = s.lookupAccount(globalAccountName)
5170+
if err != nil {
5171+
return err
5172+
}
5173+
mset, err = acc.lookupStream("TEST")
5174+
if err != nil {
5175+
return err
5176+
}
5177+
rn := mset.raftNode().(*raft)
5178+
rn.RLock()
5179+
peerNames := rn.peerNames()
5180+
rn.RUnlock()
5181+
slices.Sort(peerNames)
5182+
if count == 0 {
5183+
ps = peerNames
5184+
} else if !slices.Equal(ps, peerNames) {
5185+
rsid := rs.NodeName()
5186+
containsOld := slices.Contains(ps, rsid) || slices.Contains(peerNames, rsid)
5187+
return fmt.Errorf("no equal peers, expected: %v, got: %v, contains old peer (%s): %v", ps, peerNames, rsid, containsOld)
5188+
}
5189+
count++
5190+
}
5191+
if count != 3 {
5192+
return fmt.Errorf("expected 3 servers hosting stream, got: %d", count)
5193+
}
5194+
return nil
5195+
})
5196+
}

0 commit comments

Comments
 (0)