Skip to content

Commit c10a3e9

Browse files
NRG: Truncate down to committed on step down
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 4541eea commit c10a3e9

File tree

4 files changed

+133
-6
lines changed

4 files changed

+133
-6
lines changed

server/jetstream_cluster_1_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8495,6 +8495,9 @@ func TestJetStreamClusterSnapshotStreamAssetOnShutdown(t *testing.T) {
84958495
// Publish, so we have something new to snapshot.
84968496
_, err = js.Publish("foo", nil)
84978497
require_NoError(t, err)
8498+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
8499+
return checkState(t, c, globalAccountName, "TEST")
8500+
})
84988501

84998502
// Shutdown servers, and check if all made stream snapshots.
85008503
for _, s := range c.servers {

server/raft.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ type raft struct {
169169
pindex uint64 // Previous index from the last snapshot
170170
commit uint64 // Index of the most recent commit
171171
applied uint64 // Index of the most recently applied commit
172+
lterm uint64 // Term of the entry before this leader's term started.
173+
lindex uint64 // Index of the entry before this leader's term started
172174

173175
aflr uint64 // Index when to signal initial messages have been applied after becoming leader. 0 means signaling is disabled.
174176

@@ -188,7 +190,7 @@ type raft struct {
188190
isSysAcc atomic.Bool // Are we utilizing the system account?
189191
maybeLeader bool // The group had a preferred leader. And is maybe already acting as leader prior to scale up.
190192

191-
observer bool // The node is observing, i.e. not participating in voting
193+
observer bool // The node is observing, i.e. not able to become leader
192194

193195
extSt extensionState // Extension state
194196

@@ -3303,6 +3305,16 @@ func (n *raft) truncateWAL(term, index uint64) {
33033305
n.pterm, n.pindex = term, index
33043306
}
33053307

3308+
// Truncate our WAL to remove all entries from the current leader's term if
3309+
// none were committed (no quorum). Keep all of them if at least one had quorum.
3310+
// Lock should be held.
3311+
func (n *raft) truncateWALForLeaderTerm() {
3312+
// Had at least one entry in the current leader's term and none were committed.
3313+
if n.pindex > n.lindex && n.lindex >= n.commit {
3314+
n.truncateWAL(n.lterm, n.lindex)
3315+
}
3316+
}
3317+
33063318
// Reset our WAL. This is equivalent to truncating all data from the log.
33073319
// Lock should be held.
33083320
func (n *raft) resetWAL() {
@@ -4290,6 +4302,8 @@ retry:
42904302
var leadChange bool
42914303
if pstate == Leader && state != Leader {
42924304
leadChange = true
4305+
// Truncate entries from this leader's term if none had quorum.
4306+
n.truncateWALForLeaderTerm()
42934307
n.updateLeadChange(false)
42944308
// Drain the append entry response and proposal queues.
42954309
n.resp.drain()
@@ -4383,6 +4397,8 @@ func (n *raft) switchToLeader() {
43834397
sendHB := state.LastSeq > n.commit
43844398

43854399
n.lxfer = false
4400+
n.lterm = n.pterm
4401+
n.lindex = n.pindex
43864402
n.updateLeader(n.id)
43874403
leadChange := n.switchState(Leader)
43884404

server/raft_helpers_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package server
1818

1919
import (
2020
"encoding/binary"
21+
"errors"
2122
"fmt"
2223
"math/rand"
2324
"sync"
@@ -329,14 +330,15 @@ func (a *stateAdder) snapshot(t *testing.T) {
329330
func (rg smGroup) waitOnTotal(t *testing.T, expected int64) {
330331
t.Helper()
331332
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
333+
var err error
332334
for _, sm := range rg {
333335
asm := sm.(*stateAdder)
334336
if total := asm.total(); total != expected {
335-
return fmt.Errorf("Adder on %v has wrong total: %d vs %d",
336-
asm.server(), total, expected)
337+
err = errors.Join(err, fmt.Errorf("Adder on %v has wrong total: %d vs %d",
338+
asm.server(), total, expected))
337339
}
338340
}
339-
return nil
341+
return err
340342
})
341343
}
342344

server/raft_test.go

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -913,22 +913,29 @@ func TestNRGNoResetOnAppendEntryResponse(t *testing.T) {
913913
follower := rg.nonLeader().node().(*raft)
914914
lsm := rg.leader().(*stateAdder)
915915

916+
// Ensure all servers can get quorum on an initial set.
917+
c.waitOnAllCurrent()
918+
lsm.proposeDelta(5)
919+
rg.waitOnTotal(t, 5)
920+
916921
// Subscribe for append entries that aren't heartbeats and respond to
917922
// each of them as though it's a non-success and with a higher term.
918923
// The higher term in this case is what would cause the leader previously
919924
// to reset the entire log which it shouldn't do.
925+
var once bool
920926
_, err := nc.Subscribe(fmt.Sprintf(raftAppendSubj, "TEST"), func(msg *nats.Msg) {
921-
if ae, err := follower.decodeAppendEntry(msg.Data, nil, msg.Reply); err == nil && len(ae.entries) > 0 {
927+
if ae, err := follower.decodeAppendEntry(msg.Data, nil, msg.Reply); err == nil && len(ae.entries) > 0 && !once {
922928
ar := newAppendEntryResponse(ae.term+1, ae.commit, follower.id, false)
923929
require_NoError(t, msg.Respond(ar.encode(nil)))
930+
once = true
924931
}
925932
})
926933
require_NoError(t, err)
927934

928935
// Generate an append entry that the subscriber above can respond to.
929936
c.waitOnAllCurrent()
930937
lsm.proposeDelta(5)
931-
rg.waitOnTotal(t, 5)
938+
rg.waitOnTotal(t, 10)
932939

933940
// The was-leader should now have stepped down, make sure that it
934941
// didn't blow away its log in the process.
@@ -2363,6 +2370,105 @@ func TestNRGSignalLeadChangeFalseIfCampaignImmediately(t *testing.T) {
23632370
}
23642371
}
23652372

2373+
func TestNRGClearWALOnStepDownWhenZeroCommits(t *testing.T) {
2374+
n, cleanup := initSingleMemRaftNode(t)
2375+
defer cleanup()
2376+
2377+
// Create a sample entry, the content doesn't matter, just that it's stored.
2378+
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
2379+
entries := []*Entry{newEntry(EntryNormal, esm)}
2380+
2381+
// Switch this node to leader, and send one entry. It will not get quorum.
2382+
n.term++
2383+
n.switchToLeader()
2384+
require_Equal(t, n.term, 1)
2385+
require_Equal(t, n.pindex, 0)
2386+
n.sendAppendEntry(entries)
2387+
require_Equal(t, n.pindex, 1)
2388+
require_Equal(t, n.pterm, 1)
2389+
2390+
// Shouldn't have any commits up to this point.
2391+
require_Equal(t, n.commit, 0)
2392+
2393+
// If we now step down, we clear our WAL, we had no commits.
2394+
n.stepdown(noLeader)
2395+
require_Equal(t, n.commit, 0)
2396+
require_Equal(t, n.pindex, 0)
2397+
require_Equal(t, n.pterm, 0)
2398+
}
2399+
2400+
func TestNRGTruncateDownToPreviousTermOnStepDown(t *testing.T) {
2401+
tests := []struct {
2402+
title string
2403+
commit uint64
2404+
}{
2405+
// The current leader's term is fully removed. None of them had quorum.
2406+
{title: "no-commits", commit: 0},
2407+
{title: "half-previous-term", commit: 1},
2408+
{title: "full-previous-term", commit: 2},
2409+
// At least one entry in the leader's term had quorum, need to preserve all of them.
2410+
{title: "half-leader-term", commit: 3},
2411+
}
2412+
for _, test := range tests {
2413+
t.Run(test.title, func(t *testing.T) {
2414+
n, cleanup := initSingleMemRaftNode(t)
2415+
defer cleanup()
2416+
2417+
// Create a sample entry, the content doesn't matter, just that it's stored.
2418+
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
2419+
entries := []*Entry{newEntry(EntryNormal, esm)}
2420+
2421+
// Switch this node to leader, and send two entries. The first will get quorum, the second will not.
2422+
n.term++
2423+
n.switchToLeader()
2424+
require_Equal(t, n.term, 1)
2425+
require_Equal(t, n.pindex, 0)
2426+
n.sendAppendEntry(entries)
2427+
n.sendAppendEntry(entries)
2428+
require_Equal(t, n.pindex, 2)
2429+
require_Equal(t, n.pterm, 1)
2430+
2431+
// Shouldn't have any commits up to this point.
2432+
require_Equal(t, n.commit, 0)
2433+
2434+
// We get quorum on the first entry.
2435+
for i := uint64(1); i <= test.commit && i <= 2; i++ {
2436+
require_NoError(t, n.applyCommit(i))
2437+
require_Equal(t, n.commit, i)
2438+
}
2439+
2440+
// Act like the next entry is sent under a new term.
2441+
// That will verify we can revert back to the pterm at time of last commit.
2442+
n.term++
2443+
n.switchToLeader()
2444+
n.sendAppendEntry(entries)
2445+
n.sendAppendEntry(entries)
2446+
require_Equal(t, n.pindex, 4)
2447+
require_Equal(t, n.pterm, 2)
2448+
2449+
if test.commit == 3 {
2450+
require_NoError(t, n.applyCommit(3))
2451+
require_Equal(t, n.commit, 3)
2452+
}
2453+
2454+
// If we now step down, we should remove all entries from the leader's term if none had quorum.
2455+
// This is required, otherwise we could try to become leader again and trick a follower into
2456+
// voting for us based on a pterm without quorum. That could result in overwriting an entry
2457+
// at a pindex where a previous leader knew we had quorum on, but the follower didn't.
2458+
n.stepdown(noLeader)
2459+
2460+
require_Equal(t, n.commit, test.commit)
2461+
if test.commit < 3 {
2462+
require_Equal(t, n.pindex, 2)
2463+
require_Equal(t, n.pterm, 1)
2464+
} else {
2465+
require_Equal(t, n.pindex, 4)
2466+
require_Equal(t, n.pterm, 2)
2467+
}
2468+
})
2469+
}
2470+
}
2471+
23662472
// This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before
23672473
// proposing the next one.
23682474
// The test may fail if:

0 commit comments

Comments
 (0)