Skip to content

Commit 97a807e

Browse files
[FIXED] Desync after partial catchup from old leader
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 274ea2c commit 97a807e

File tree

2 files changed

+144
-4
lines changed

2 files changed

+144
-4
lines changed

server/jetstream_cluster.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8618,7 +8618,28 @@ RETRY:
86188618
// Check for eof signaling.
86198619
if len(msg) == 0 {
86208620
msgsQ.recycle(&mrecs)
8621-
return nil
8621+
8622+
// Sanity check that we've received all data expected by the snapshot.
8623+
mset.mu.RLock()
8624+
lseq := mset.lseq
8625+
mset.mu.RUnlock()
8626+
if lseq >= snap.LastSeq {
8627+
return nil
8628+
}
8629+
8630+
// Make sure we do not spin and make things worse.
8631+
const minRetryWait = 2 * time.Second
8632+
elapsed := time.Since(reqSendTime)
8633+
if elapsed < minRetryWait {
8634+
select {
8635+
case <-s.quitCh:
8636+
return ErrServerNotRunning
8637+
case <-qch:
8638+
return errCatchupStreamStopped
8639+
case <-time.After(minRetryWait - elapsed):
8640+
}
8641+
}
8642+
goto RETRY
86228643
}
86238644
if _, err := mset.processCatchupMsg(msg); err == nil {
86248645
if mrec.reply != _EMPTY_ {
@@ -9122,6 +9143,15 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
91229143
// In the latter case the request expects us to have more. Just continue and value availability here.
91239144
// This should only be possible if the logs have already desynced, and we shouldn't have become leader
91249145
// in the first place. Not much we can do here in this (hypothetical) scenario.
9146+
9147+
// Do another quick sanity check that we actually have enough data to satisfy the request.
9148+
// If not, let's step down and hope a new leader can correct this.
9149+
if state.LastSeq < last {
9150+
s.Warnf("Catchup for stream '%s > %s' skipped, requested sequence %d was larger than current state: %+v",
9151+
mset.account(), mset.name(), seq, state)
9152+
node.StepDown()
9153+
return
9154+
}
91259155
}
91269156

91279157
mset.setCatchupPeer(sreq.Peer, last-seq)
@@ -9241,7 +9271,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
92419271
// The snapshot has a larger last sequence then we have. This could be due to a truncation
92429272
// when trying to recover after corruption, still not 100% sure. Could be off by 1 too somehow,
92439273
// but tested a ton of those with no success.
9244-
s.Warnf("Catchup for stream '%s > %s' completed, but requested sequence %d was larger then current state: %+v",
9274+
s.Warnf("Catchup for stream '%s > %s' completed, but requested sequence %d was larger than current state: %+v",
92459275
mset.account(), mset.name(), seq, state)
92469276
// Try our best to redo our invalidated snapshot as well.
92479277
if n := mset.raftNode(); n != nil {

server/jetstream_cluster_4_test.go

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3528,8 +3528,118 @@ func TestJetStreamClusterConsumerDesyncAfterErrorDuringStreamCatchup(t *testing.
35283528
c.waitOnConsumerLeader(globalAccountName, "TEST", "CONSUMER")
35293529

35303530
// Outdated server must NOT become the leader.
3531-
newConsummerLeaderServer := c.consumerLeader(globalAccountName, "TEST", "CONSUMER")
3532-
require_Equal(t, newConsummerLeaderServer.Name(), clusterResetServerName)
3531+
newConsumerLeaderServer := c.consumerLeader(globalAccountName, "TEST", "CONSUMER")
3532+
require_Equal(t, newConsumerLeaderServer.Name(), clusterResetServerName)
3533+
}
3534+
3535+
func TestJetStreamClusterDesyncAfterEofFromOldStreamLeader(t *testing.T) {
3536+
test := func(t *testing.T, eof bool) {
3537+
c := createJetStreamClusterExplicit(t, "R5S", 5)
3538+
defer c.shutdown()
3539+
3540+
cs := c.randomServer()
3541+
nc, js := jsClientConnect(t, cs)
3542+
defer nc.Close()
3543+
3544+
_, err := js.AddStream(&nats.StreamConfig{
3545+
Name: "TEST",
3546+
Subjects: []string{"foo"},
3547+
Replicas: 5,
3548+
})
3549+
require_NoError(t, err)
3550+
3551+
sl := c.streamLeader(globalAccountName, "TEST")
3552+
var rs *Server
3553+
var catchup *Server
3554+
for _, s := range c.servers {
3555+
if s != sl && s != cs {
3556+
if rs == nil {
3557+
rs = s
3558+
} else {
3559+
catchup = s
3560+
break
3561+
}
3562+
}
3563+
}
3564+
3565+
// Shutdown server that needs to catch up, so it gets a snapshot from the leader after restart.
3566+
catchup.Shutdown()
3567+
3568+
// One message is received and applied by all replicas.
3569+
_, err = js.Publish("foo", nil)
3570+
require_NoError(t, err)
3571+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
3572+
return checkState(t, c, globalAccountName, "TEST")
3573+
})
3574+
3575+
// Disable Raft and start cluster subs for server, simulating an old leader with an outdated log.
3576+
acc, err := rs.lookupAccount(globalAccountName)
3577+
require_NoError(t, err)
3578+
mset, err := acc.lookupStream("TEST")
3579+
require_NoError(t, err)
3580+
mset.startClusterSubs()
3581+
rn := mset.raftNode()
3582+
rn.Stop()
3583+
rn.WaitForStop()
3584+
3585+
// Temporarily disable cluster subs for this test.
3586+
// Normally due to multiple cluster subs responses will interleave, but this is simpler for this test.
3587+
acc, err = sl.lookupAccount(globalAccountName)
3588+
require_NoError(t, err)
3589+
mset, err = acc.lookupStream("TEST")
3590+
require_NoError(t, err)
3591+
mset.stopClusterSubs()
3592+
3593+
// Publish another message that the old leader will not get.
3594+
_, err = js.Publish("foo", nil)
3595+
require_NoError(t, err)
3596+
require_NoError(t, sl.JetStreamSnapshotStream(globalAccountName, "TEST"))
3597+
3598+
sa := sl.getJetStream().streamAssignment(globalAccountName, "TEST")
3599+
require_NotNil(t, sa)
3600+
3601+
// Send EOF immediately to requesting server, otherwise the server needs to time out and retry.
3602+
if eof {
3603+
snc, err := nats.Connect(rs.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
3604+
require_NoError(t, err)
3605+
defer snc.Close()
3606+
3607+
sub, err := snc.Subscribe(sa.Sync, func(msg *nats.Msg) {
3608+
// EOF
3609+
rs.sendInternalMsgLocked(msg.Reply, _EMPTY_, nil, nil)
3610+
})
3611+
require_NoError(t, err)
3612+
defer sub.Drain()
3613+
}
3614+
3615+
// Restart server so it starts catching up.
3616+
catchup = c.restartServer(catchup)
3617+
3618+
// Wait for server to start catching up.
3619+
// This shouldn't be a problem. The server should retry catchup, recognizing it wasn't caught up fully.
3620+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
3621+
if a, err := catchup.lookupAccount(globalAccountName); err != nil {
3622+
return err
3623+
} else if m, err := a.lookupStream("TEST"); err != nil {
3624+
return err
3625+
} else if !m.isCatchingUp() {
3626+
return errors.New("stream not catching up")
3627+
}
3628+
return nil
3629+
})
3630+
3631+
// Stop old leader, and re-enable cluster subs on proper leader.
3632+
rs.Shutdown()
3633+
mset.startClusterSubs()
3634+
3635+
// Server should automatically restart catchup and get the missing data.
3636+
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
3637+
return checkState(t, c, globalAccountName, "TEST")
3638+
})
3639+
}
3640+
3641+
t.Run("eof", func(t *testing.T) { test(t, true) })
3642+
t.Run("retry", func(t *testing.T) { test(t, false) })
35333643
}
35343644

35353645
func TestJetStreamClusterReservedResourcesAccountingAfterClusterReset(t *testing.T) {

0 commit comments

Comments
 (0)