Skip to content

Commit d2ee47e

Browse files
qdm12timwu20
andauthored
fix(lib/grandpa): capped number of tracked vote messages (#2485)
- Vote messages tracker - Removes oldest vote message when tracker capacity is reached - Efficient removal of multiple messages at any place in the tracker queue (linked list) if they get processed - Efficient removal of oldest message - Uses a bit more space to store each block hash + authority ID, for each vote message - Order is not modified for the same vote message (same block hash and authority id) - Discard vote messages for more than 1 round in the future from the state round (thanks [andresilva](https://github.com/andresilva)) - Discard vote messages for more than 1 round in the past from the state round (thanks [andresilva](https://github.com/andresilva)) - Disable `addCatchUpResponse` (not implemented yet) to avoid a possible memory leak/abuse, see #1531 - Comment with issue number about the reputation change of peers for bad vote messages Co-authored-by: Timothy Wu <[email protected]>
1 parent 1f20d98 commit d2ee47e

File tree

5 files changed

+625
-97
lines changed

5 files changed

+625
-97
lines changed

lib/grandpa/message_tracker.go

Lines changed: 29 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99

1010
"github.com/ChainSafe/gossamer/dot/types"
1111
"github.com/ChainSafe/gossamer/lib/common"
12-
"github.com/ChainSafe/gossamer/lib/crypto/ed25519"
12+
"github.com/libp2p/go-libp2p-core/peer"
1313
)
1414

1515
// tracker keeps track of messages that have been received, but have failed to
@@ -18,8 +18,8 @@ import (
1818
type tracker struct {
1919
blockState BlockState
2020
handler *MessageHandler
21-
// map of vote block hash -> array of VoteMessages for that hash
22-
voteMessages map[common.Hash]map[ed25519.PublicKeyBytes]*networkVoteMessage
21+
votes votesTracker
22+
2323
// map of commit block hash to commit message
2424
commitMessages map[common.Hash]*CommitMessage
2525
mapLock sync.Mutex
@@ -32,10 +32,11 @@ type tracker struct {
3232
}
3333

3434
func newTracker(bs BlockState, handler *MessageHandler) *tracker {
35+
const votesCapacity = 1000
3536
return &tracker{
3637
blockState: bs,
3738
handler: handler,
38-
voteMessages: make(map[common.Hash]map[ed25519.PublicKeyBytes]*networkVoteMessage),
39+
votes: newVotesTracker(votesCapacity),
3940
commitMessages: make(map[common.Hash]*CommitMessage),
4041
mapLock: sync.Mutex{},
4142
in: bs.GetImportedBlockNotifierChannel(),
@@ -53,21 +54,15 @@ func (t *tracker) stop() {
5354
t.blockState.FreeImportedBlockNotifierChannel(t.in)
5455
}
5556

56-
func (t *tracker) addVote(v *networkVoteMessage) {
57-
if v.msg == nil {
57+
func (t *tracker) addVote(peerID peer.ID, message *VoteMessage) {
58+
if message == nil {
5859
return
5960
}
6061

6162
t.mapLock.Lock()
6263
defer t.mapLock.Unlock()
6364

64-
msgs, has := t.voteMessages[v.msg.Message.BlockHash]
65-
if !has {
66-
msgs = make(map[ed25519.PublicKeyBytes]*networkVoteMessage)
67-
t.voteMessages[v.msg.Message.BlockHash] = msgs
68-
}
69-
70-
msgs[v.msg.Message.AuthorityID] = v
65+
t.votes.add(peerID, message)
7166
}
7267

7368
func (t *tracker) addCommit(cm *CommitMessage) {
@@ -76,10 +71,11 @@ func (t *tracker) addCommit(cm *CommitMessage) {
7671
t.commitMessages[cm.Vote.Hash] = cm
7772
}
7873

79-
func (t *tracker) addCatchUpResponse(cr *CatchUpResponse) {
74+
func (t *tracker) addCatchUpResponse(_ *CatchUpResponse) {
8075
t.catchUpResponseMessageMutex.Lock()
8176
defer t.catchUpResponseMessageMutex.Unlock()
82-
t.catchUpResponseMessages[cr.Round] = cr
77+
// uncomment when usage is setup properly, see #1531
78+
// t.catchUpResponseMessages[cr.Round] = cr
8379
}
8480

8581
func (t *tracker) handleBlocks() {
@@ -108,18 +104,18 @@ func (t *tracker) handleBlock(b *types.Block) {
108104
defer t.mapLock.Unlock()
109105

110106
h := b.Header.Hash()
111-
if vms, has := t.voteMessages[h]; has {
112-
for _, v := range vms {
113-
// handleMessage would never error for vote message
114-
_, err := t.handler.handleMessage(v.from, v.msg)
115-
if err != nil {
116-
logger.Warnf("failed to handle vote message %v: %s", v, err)
117-
}
107+
vms := t.votes.messages(h)
108+
for _, v := range vms {
109+
// handleMessage would never error for vote message
110+
_, err := t.handler.handleMessage(v.from, v.msg)
111+
if err != nil {
112+
logger.Warnf("failed to handle vote message %v: %s", v, err)
118113
}
119-
120-
delete(t.voteMessages, h)
121114
}
122115

116+
// delete block hash that may or may not be in the tracker.
117+
t.votes.delete(h)
118+
123119
if cm, has := t.commitMessages[h]; has {
124120
_, err := t.handler.handleMessage("", cm)
125121
if err != nil {
@@ -134,17 +130,17 @@ func (t *tracker) handleTick() {
134130
t.mapLock.Lock()
135131
defer t.mapLock.Unlock()
136132

137-
for _, vms := range t.voteMessages {
138-
for _, v := range vms {
133+
for _, networkVoteMessage := range t.votes.networkVoteMessages() {
134+
peerID := networkVoteMessage.from
135+
message := networkVoteMessage.msg
136+
_, err := t.handler.handleMessage(peerID, message)
137+
if err != nil {
139138
// handleMessage would never error for vote message
140-
_, err := t.handler.handleMessage(v.from, v.msg)
141-
if err != nil {
142-
logger.Debugf("failed to handle vote message %v: %s", v, err)
143-
}
139+
logger.Debugf("failed to handle vote message %v from peer id %s: %s", message, peerID, err)
140+
}
144141

145-
if v.msg.Round < t.handler.grandpa.state.round && v.msg.SetID == t.handler.grandpa.state.setID {
146-
delete(t.voteMessages, v.msg.Message.BlockHash)
147-
}
142+
if message.Round < t.handler.grandpa.state.round && message.SetID == t.handler.grandpa.state.setID {
143+
t.votes.delete(message.Message.BlockHash)
148144
}
149145
}
150146

lib/grandpa/message_tracker_test.go

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,24 @@ import (
1616
"github.com/stretchr/testify/require"
1717
)
1818

19+
// getMessageFromVotesTracker returns the vote message
20+
// from the votes tracker for the given block hash and authority ID.
21+
func getMessageFromVotesTracker(votes votesTracker,
22+
blockHash common.Hash, authorityID ed25519.PublicKeyBytes) (
23+
message *VoteMessage) {
24+
authorityIDToElement, has := votes.mapping[blockHash]
25+
if !has {
26+
return nil
27+
}
28+
29+
element, ok := authorityIDToElement[authorityID]
30+
if !ok {
31+
return nil
32+
}
33+
34+
return element.Value.(networkVoteMessage).msg
35+
}
36+
1937
func TestMessageTracker_ValidateMessage(t *testing.T) {
2038
kr, err := keystore.NewEd25519Keyring()
2139
require.NoError(t, err)
@@ -33,13 +51,11 @@ func TestMessageTracker_ValidateMessage(t *testing.T) {
3351
require.NoError(t, err)
3452
gs.keypair = kr.Bob().(*ed25519.Keypair)
3553

36-
expected := &networkVoteMessage{
37-
msg: msg,
38-
}
39-
4054
_, err = gs.validateVoteMessage("", msg)
4155
require.Equal(t, err, ErrBlockDoesNotExist)
42-
require.Equal(t, expected, gs.tracker.voteMessages[fake.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()])
56+
authorityID := kr.Alice().Public().(*ed25519.PublicKey).AsBytes()
57+
voteMessage := getMessageFromVotesTracker(gs.tracker.votes, fake.Hash(), authorityID)
58+
require.Equal(t, msg, voteMessage)
4359
}
4460

4561
func TestMessageTracker_SendMessage(t *testing.T) {
@@ -72,13 +88,11 @@ func TestMessageTracker_SendMessage(t *testing.T) {
7288
require.NoError(t, err)
7389
gs.keypair = kr.Bob().(*ed25519.Keypair)
7490

75-
expected := &networkVoteMessage{
76-
msg: msg,
77-
}
78-
7991
_, err = gs.validateVoteMessage("", msg)
8092
require.Equal(t, err, ErrBlockDoesNotExist)
81-
require.Equal(t, expected, gs.tracker.voteMessages[next.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()])
93+
authorityID := kr.Alice().Public().(*ed25519.PublicKey).AsBytes()
94+
voteMessage := getMessageFromVotesTracker(gs.tracker.votes, next.Hash(), authorityID)
95+
require.Equal(t, msg, voteMessage)
8296

8397
err = gs.blockState.(*state.BlockState).AddBlock(&types.Block{
8498
Header: *next,
@@ -126,13 +140,11 @@ func TestMessageTracker_ProcessMessage(t *testing.T) {
126140
require.NoError(t, err)
127141
gs.keypair = kr.Bob().(*ed25519.Keypair)
128142

129-
expected := &networkVoteMessage{
130-
msg: msg,
131-
}
132-
133143
_, err = gs.validateVoteMessage("", msg)
134144
require.Equal(t, ErrBlockDoesNotExist, err)
135-
require.Equal(t, expected, gs.tracker.voteMessages[next.Hash()][kr.Alice().Public().(*ed25519.PublicKey).AsBytes()])
145+
authorityID := kr.Alice().Public().(*ed25519.PublicKey).AsBytes()
146+
voteMessage := getMessageFromVotesTracker(gs.tracker.votes, next.Hash(), authorityID)
147+
require.Equal(t, msg, voteMessage)
136148

137149
err = gs.blockState.(*state.BlockState).AddBlock(&types.Block{
138150
Header: *next,
@@ -147,7 +159,7 @@ func TestMessageTracker_ProcessMessage(t *testing.T) {
147159
}
148160
pv, has := gs.prevotes.Load(kr.Alice().Public().(*ed25519.PublicKey).AsBytes())
149161
require.True(t, has)
150-
require.Equal(t, expectedVote, &pv.(*SignedVote).Vote, gs.tracker.voteMessages)
162+
require.Equal(t, expectedVote, &pv.(*SignedVote).Vote, gs.tracker.votes)
151163
}
152164

153165
func TestMessageTracker_MapInsideMap(t *testing.T) {
@@ -163,24 +175,19 @@ func TestMessageTracker_MapInsideMap(t *testing.T) {
163175
}
164176

165177
hash := header.Hash()
166-
_, ok := gs.tracker.voteMessages[hash]
167-
require.False(t, ok)
178+
messages := gs.tracker.votes.messages(hash)
179+
require.Empty(t, messages)
168180

169181
gs.keypair = kr.Alice().(*ed25519.Keypair)
170182
authorityID := kr.Alice().Public().(*ed25519.PublicKey).AsBytes()
171183
_, msg, err := gs.createSignedVoteAndVoteMessage(NewVoteFromHeader(header), prevote)
172184
require.NoError(t, err)
173185
gs.keypair = kr.Bob().(*ed25519.Keypair)
174186

175-
gs.tracker.addVote(&networkVoteMessage{
176-
msg: msg,
177-
})
178-
179-
voteMsgs, ok := gs.tracker.voteMessages[hash]
180-
require.True(t, ok)
187+
gs.tracker.addVote("", msg)
181188

182-
_, ok = voteMsgs[authorityID]
183-
require.True(t, ok)
189+
voteMessage := getMessageFromVotesTracker(gs.tracker.votes, hash, authorityID)
190+
require.NotEmpty(t, voteMessage)
184191
}
185192

186193
func TestMessageTracker_handleTick(t *testing.T) {
@@ -197,9 +204,7 @@ func TestMessageTracker_handleTick(t *testing.T) {
197204
BlockHash: testHash,
198205
},
199206
}
200-
gs.tracker.addVote(&networkVoteMessage{
201-
msg: msg,
202-
})
207+
gs.tracker.addVote("", msg)
203208

204209
gs.tracker.handleTick()
205210

@@ -212,7 +217,7 @@ func TestMessageTracker_handleTick(t *testing.T) {
212217
}
213218

214219
// shouldn't be deleted as round in message >= grandpa round
215-
require.Equal(t, 1, len(gs.tracker.voteMessages[testHash]))
220+
require.Len(t, gs.tracker.votes.messages(testHash), 1)
216221

217222
gs.state.round = 1
218223
msg = &VoteMessage{
@@ -221,9 +226,7 @@ func TestMessageTracker_handleTick(t *testing.T) {
221226
BlockHash: testHash,
222227
},
223228
}
224-
gs.tracker.addVote(&networkVoteMessage{
225-
msg: msg,
226-
})
229+
gs.tracker.addVote("", msg)
227230

228231
gs.tracker.handleTick()
229232

@@ -235,5 +238,5 @@ func TestMessageTracker_handleTick(t *testing.T) {
235238
}
236239

237240
// should be deleted as round in message < grandpa round
238-
require.Empty(t, len(gs.tracker.voteMessages[testHash]))
241+
require.Empty(t, gs.tracker.votes.messages(testHash))
239242
}

lib/grandpa/vote_message.go

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -126,51 +126,70 @@ func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, erro
126126
// check for message signature
127127
pk, err := ed25519.NewPublicKey(m.Message.AuthorityID[:])
128128
if err != nil {
129+
// TODO Affect peer reputation
130+
// https://github.com/ChainSafe/gossamer/issues/2505
129131
return nil, err
130132
}
131133

132134
err = validateMessageSignature(pk, m)
133135
if err != nil {
136+
// TODO Affect peer reputation
137+
// https://github.com/ChainSafe/gossamer/issues/2505
134138
return nil, err
135139
}
136140

137141
if m.SetID != s.state.setID {
138142
return nil, ErrSetIDMismatch
139143
}
140144

141-
// check that vote is for current round
142-
if m.Round != s.state.round {
143-
if m.Round < s.state.round {
144-
// peer doesn't know round was finalised, send out another commit message
145-
header, err := s.blockState.GetFinalisedHeader(m.Round, m.SetID)
146-
if err != nil {
147-
return nil, err
148-
}
145+
const maxRoundsLag = 1
146+
minRoundAccepted := s.state.round - maxRoundsLag
147+
if minRoundAccepted > s.state.round {
148+
// we overflowed below 0 so set the minimum to 0.
149+
minRoundAccepted = 0
150+
}
149151

150-
cm, err := s.newCommitMessage(header, m.Round)
151-
if err != nil {
152-
return nil, err
153-
}
152+
const maxRoundsAhead = 1
153+
maxRoundAccepted := s.state.round + maxRoundsAhead
154154

155-
// send finalised block from previous round to network
156-
msg, err := cm.ToConsensusMessage()
157-
if err != nil {
158-
return nil, err
159-
}
155+
if m.Round < minRoundAccepted || m.Round > maxRoundAccepted {
156+
// Discard message
157+
// TODO: affect peer reputation, this is shameful impolite behaviour
158+
// https://github.com/ChainSafe/gossamer/issues/2505
159+
return nil, nil //nolint:nilnil
160+
}
160161

161-
if err = s.network.SendMessage(from, msg); err != nil {
162-
logger.Warnf("failed to send CommitMessage: %s", err)
163-
}
164-
} else {
165-
// round is higher than ours, perhaps we are behind. store vote in tracker for now
166-
s.tracker.addVote(&networkVoteMessage{
167-
from: from,
168-
msg: m,
169-
})
162+
if m.Round < s.state.round {
163+
// message round is lagging by 1
164+
// peer doesn't know round was finalised, send out another commit message
165+
header, err := s.blockState.GetFinalisedHeader(m.Round, m.SetID)
166+
if err != nil {
167+
return nil, err
168+
}
169+
170+
cm, err := s.newCommitMessage(header, m.Round)
171+
if err != nil {
172+
return nil, err
173+
}
174+
175+
// send finalised block from previous round to network
176+
msg, err := cm.ToConsensusMessage()
177+
if err != nil {
178+
return nil, err
179+
}
180+
181+
if err = s.network.SendMessage(from, msg); err != nil {
182+
logger.Warnf("failed to send CommitMessage: %s", err)
170183
}
171184

172185
// TODO: get justification if your round is lower, or just do catch-up? (#1815)
173186
return nil, errRoundMismatch(m.Round, s.state.round)
187+
} else if m.Round > s.state.round {
188+
// Message round is higher by 1 than the round of our state,
189+
// we may be lagging behind, so store the message in the tracker
190+
// for processing later in the coming few milliseconds.
191+
s.tracker.addVote(from, m)
192+
return nil, errRoundMismatch(m.Round, s.state.round)
174193
}
175194

176195
// check for equivocation ie. multiple votes within one subround
@@ -192,10 +211,7 @@ func (s *Service) validateVoteMessage(from peer.ID, m *VoteMessage) (*Vote, erro
192211
errors.Is(err, blocktree.ErrDescendantNotFound) ||
193212
errors.Is(err, blocktree.ErrEndNodeNotFound) ||
194213
errors.Is(err, blocktree.ErrStartNodeNotFound) {
195-
s.tracker.addVote(&networkVoteMessage{
196-
from: from,
197-
msg: m,
198-
})
214+
s.tracker.addVote(from, m)
199215
}
200216
if err != nil {
201217
return nil, err

0 commit comments

Comments
 (0)