Skip to content

Commit 1725cad

Browse files
committed
NRG: Monotonic heartbeat and quorum tracking
Converting the timestamps to `UnixNano` meant that all of these operations were using wall-clock rather than monotonic time, which meant that these operations were not safe against wall-clock drifts or NTP adjustments. This could result in unexpected loss of quorum. Signed-off-by: Neil Twigg <[email protected]>
1 parent b391ee9 commit 1725cad

File tree

2 files changed

+26
-31
lines changed

2 files changed

+26
-31
lines changed

server/monitor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4027,8 +4027,8 @@ func (s *Server) Raftz(opts *RaftzOptions) *RaftzStatus {
40274027
Known: p.kp,
40284028
LastReplicatedIndex: p.li,
40294029
}
4030-
if p.ts > 0 {
4031-
peer.LastSeen = time.Since(time.Unix(0, p.ts)).String()
4030+
if !p.ts.IsZero() {
4031+
peer.LastSeen = time.Since(p.ts).String()
40324032
}
40334033
info.Peers[id] = peer
40344034
}

server/raft.go

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -240,9 +240,9 @@ type catchupState struct {
240240

241241
// lps holds peer state of last time and last index replicated.
242242
type lps struct {
243-
ts int64 // Last timestamp
244-
li uint64 // Last index replicated
245-
kp bool // Known peer
243+
ts time.Time // Last timestamp
244+
li uint64 // Last index replicated
245+
kp bool // Known peer
246246
}
247247

248248
const (
@@ -502,13 +502,13 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
502502
}
503503

504504
// Make sure to track ourselves.
505-
n.peers[n.id] = &lps{time.Now().UnixNano(), 0, true}
505+
n.peers[n.id] = &lps{time.Now(), 0, true}
506506

507507
// Track known peers
508508
for _, peer := range ps.knownPeers {
509509
if peer != n.id {
510510
// Set these to 0 to start but mark as known peer.
511-
n.peers[peer] = &lps{0, 0, true}
511+
n.peers[peer] = &lps{time.Time{}, 0, true}
512512
}
513513
}
514514

@@ -1446,9 +1446,8 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool {
14461446

14471447
// Check to see that we have heard from the current leader lately.
14481448
if n.leader != noLeader && n.leader != n.id && n.catchup == nil {
1449-
okInterval := int64(hbInterval) * 2
1450-
ts := time.Now().UnixNano()
1451-
if ps := n.peers[n.leader]; ps == nil || ps.ts == 0 && (ts-ps.ts) > okInterval {
1449+
okInterval := hbInterval * 2
1450+
if ps := n.peers[n.leader]; ps == nil || time.Since(ps.ts) > okInterval {
14521451
n.debug("Not current, no recent leader contact")
14531452
return false
14541453
}
@@ -1586,14 +1585,12 @@ func (n *raft) StepDown(preferred ...string) error {
15861585
preferred = nil
15871586
}
15881587

1589-
nowts := time.Now().UnixNano()
1590-
15911588
// If we have a preferred check it first.
15921589
if maybeLeader != noLeader {
15931590
var isHealthy bool
15941591
if ps, ok := n.peers[maybeLeader]; ok {
15951592
si, ok := n.s.nodeToInfo.Load(maybeLeader)
1596-
isHealthy = ok && !si.(nodeInfo).offline && (nowts-ps.ts) < int64(hbInterval*3)
1593+
isHealthy = ok && !si.(nodeInfo).offline && time.Since(ps.ts) < hbInterval*3
15971594
}
15981595
if !isHealthy {
15991596
maybeLeader = noLeader
@@ -1608,7 +1605,7 @@ func (n *raft) StepDown(preferred ...string) error {
16081605
continue
16091606
}
16101607
si, ok := n.s.nodeToInfo.Load(peer)
1611-
isHealthy := ok && !si.(nodeInfo).offline && (nowts-ps.ts) < int64(hbInterval*3)
1608+
isHealthy := ok && !si.(nodeInfo).offline && time.Since(ps.ts) < hbInterval*3
16121609
if isHealthy {
16131610
maybeLeader = peer
16141611
break
@@ -1733,7 +1730,7 @@ func (n *raft) Peers() []*Peer {
17331730
p := &Peer{
17341731
ID: id,
17351732
Current: id == n.leader || ps.li >= n.applied,
1736-
Last: time.Unix(0, ps.ts),
1733+
Last: ps.ts,
17371734
Lag: lag,
17381735
}
17391736
peers = append(peers, p)
@@ -2633,11 +2630,10 @@ func (n *raft) Quorum() bool {
26332630
n.RLock()
26342631
defer n.RUnlock()
26352632

2636-
now, nc := time.Now().UnixNano(), 0
2633+
nc := 0
26372634
for id, peer := range n.peers {
2638-
if id == n.id || time.Duration(now-peer.ts) < lostQuorumInterval {
2639-
nc++
2640-
if nc >= n.qn {
2635+
if id == n.id || time.Since(peer.ts) < lostQuorumInterval {
2636+
if nc++; nc >= n.qn {
26412637
return true
26422638
}
26432639
}
@@ -2659,11 +2655,10 @@ func (n *raft) lostQuorumLocked() bool {
26592655
return false
26602656
}
26612657

2662-
now, nc := time.Now().UnixNano(), 0
2658+
nc := 0
26632659
for id, peer := range n.peers {
2664-
if id == n.id || time.Duration(now-peer.ts) < lostQuorumInterval {
2665-
nc++
2666-
if nc >= n.qn {
2660+
if id == n.id || time.Since(peer.ts) < lostQuorumInterval {
2661+
if nc++; nc >= n.qn {
26672662
return false
26682663
}
26692664
}
@@ -2979,7 +2974,7 @@ func (n *raft) applyCommit(index uint64) error {
29792974

29802975
if lp, ok := n.peers[newPeer]; !ok {
29812976
// We are not tracking this one automatically so we need to bump cluster size.
2982-
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, true}
2977+
n.peers[newPeer] = &lps{time.Now(), 0, true}
29832978
} else {
29842979
// Mark as added.
29852980
lp.kp = true
@@ -3135,9 +3130,9 @@ func (n *raft) trackPeer(peer string) error {
31353130
}
31363131
}
31373132
if ps := n.peers[peer]; ps != nil {
3138-
ps.ts = time.Now().UnixNano()
3133+
ps.ts = time.Now()
31393134
} else if !isRemoved {
3140-
n.peers[peer] = &lps{time.Now().UnixNano(), 0, false}
3135+
n.peers[peer] = &lps{time.Now(), 0, false}
31413136
}
31423137
n.Unlock()
31433138

@@ -3432,9 +3427,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
34323427
// Track leader directly
34333428
if isNew && ae.leader != noLeader {
34343429
if ps := n.peers[ae.leader]; ps != nil {
3435-
ps.ts = time.Now().UnixNano()
3430+
ps.ts = time.Now()
34363431
} else {
3437-
n.peers[ae.leader] = &lps{time.Now().UnixNano(), 0, true}
3432+
n.peers[ae.leader] = &lps{time.Now(), 0, true}
34383433
}
34393434
}
34403435

@@ -3695,9 +3690,9 @@ CONTINUE:
36953690
if newPeer := string(e.Data); len(newPeer) == idLen {
36963691
// Track directly, but wait for commit to be official
36973692
if ps := n.peers[newPeer]; ps != nil {
3698-
ps.ts = time.Now().UnixNano()
3693+
ps.ts = time.Now()
36993694
} else {
3700-
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0, false}
3695+
n.peers[newPeer] = &lps{time.Now(), 0, false}
37013696
}
37023697
// Store our peer in our global peer map for all peers.
37033698
peers.LoadOrStore(newPeer, newPeer)
@@ -3754,7 +3749,7 @@ func (n *raft) processPeerState(ps *peerState) {
37543749
lp.kp = true
37553750
n.peers[peer] = lp
37563751
} else {
3757-
n.peers[peer] = &lps{0, 0, true}
3752+
n.peers[peer] = &lps{time.Time{}, 0, true}
37583753
}
37593754
}
37603755
n.debug("Update peers from leader to %+v", n.peers)

0 commit comments

Comments
 (0)