Skip to content

Commit d323e23

Browse files
NRG: Only continue if aligned with pindex/pterm (#6271)
If truncation fails we would spin on retrying. Now only continue if `pindex/pterm` are aligned, otherwise responds non-success to the leader. Signed-off-by: Maurice van Veen <[email protected]>
2 parents e425a9d + 39ebc97 commit d323e23

File tree

2 files changed

+61
-3
lines changed

2 files changed

+61
-3
lines changed

server/raft.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3401,7 +3401,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
34013401
n.updateLeadChange(false)
34023402
}
34033403

3404-
RETRY:
34053404
if ae.pterm != n.pterm || ae.pindex != n.pindex {
34063405
// Check if this is a lower or equal index than what we were expecting.
34073406
if ae.pindex <= n.pindex {
@@ -3427,8 +3426,11 @@ RETRY:
34273426
}
34283427
} else if eae.term == ae.pterm {
34293428
// If terms match we can delete all entries past this one, and then continue storing the current entry.
3430-
n.truncateWAL(eae.term, eae.pindex+1)
3431-
goto RETRY
3429+
n.truncateWAL(ae.pterm, ae.pindex)
3430+
// Only continue if truncation was successful, and we ended up such that we can safely continue.
3431+
if ae.pterm == n.pterm && ae.pindex == n.pindex {
3432+
goto CONTINUE
3433+
}
34323434
} else {
34333435
// If terms mismatched, delete that entry and all others past it.
34343436
// Make sure to cancel any catchups in progress.
@@ -3512,6 +3514,7 @@ RETRY:
35123514
return
35133515
}
35143516

3517+
CONTINUE:
35153518
// Save to our WAL if we have entries.
35163519
if ae.shouldStore() {
35173520
// Only store if an original which will have sub != nil

server/raft_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1634,6 +1634,61 @@ func TestNRGTruncateDownToCommitted(t *testing.T) {
16341634
require_Equal(t, n.commit, 2)
16351635
}
16361636

1637+
type mockWALTruncateAlwaysFails struct {
1638+
WAL
1639+
}
1640+
1641+
func (m mockWALTruncateAlwaysFails) Truncate(seq uint64) error {
1642+
return errors.New("test: truncate always fails")
1643+
}
1644+
1645+
func TestNRGTruncateDownToCommittedWhenTruncateFails(t *testing.T) {
1646+
n, cleanup := initSingleMemRaftNode(t)
1647+
defer cleanup()
1648+
1649+
n.Lock()
1650+
n.wal = mockWALTruncateAlwaysFails{n.wal}
1651+
n.Unlock()
1652+
1653+
// Create a sample entry, the content doesn't matter, just that it's stored.
1654+
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
1655+
entries := []*Entry{newEntry(EntryNormal, esm)}
1656+
1657+
nats0 := "S1Nunr6R" // "nats-0"
1658+
nats1 := "yrzKKRBu" // "nats-1"
1659+
1660+
// Timeline, we are leader
1661+
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
1662+
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})
1663+
1664+
// Timeline, after leader change
1665+
aeMsg3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 0, pterm: 1, pindex: 1, entries: entries})
1666+
1667+
// Simply receive first message.
1668+
n.processAppendEntry(aeMsg1, n.aesub)
1669+
require_Equal(t, n.commit, 0)
1670+
require_Equal(t, n.wal.State().Msgs, 1)
1671+
entry, err := n.loadEntry(1)
1672+
require_NoError(t, err)
1673+
require_Equal(t, entry.leader, nats0)
1674+
1675+
// Receive second message, which commits the first message.
1676+
n.processAppendEntry(aeMsg2, n.aesub)
1677+
require_Equal(t, n.commit, 1)
1678+
require_Equal(t, n.wal.State().Msgs, 2)
1679+
entry, err = n.loadEntry(2)
1680+
require_NoError(t, err)
1681+
require_Equal(t, entry.leader, nats0)
1682+
1683+
// We receive an entry from another leader, should truncate down to commit / remove the second message.
1684+
// But, truncation fails so should register that and not change pindex/pterm.
1685+
bindex, bterm := n.pindex, n.pterm
1686+
n.processAppendEntry(aeMsg3, n.aesub)
1687+
require_Error(t, n.werr, errors.New("test: truncate always fails"))
1688+
require_Equal(t, bindex, n.pindex)
1689+
require_Equal(t, bterm, n.pterm)
1690+
}
1691+
16371692
func TestNRGForwardProposalResponse(t *testing.T) {
16381693
c := createJetStreamClusterExplicit(t, "R3S", 3)
16391694
defer c.shutdown()

0 commit comments

Comments
 (0)