Skip to content

Commit 3873331

Browse files
authored
Merge branch 'master' into cancel-iwant
2 parents bb11f5c + 0936035 commit 3873331

File tree

3 files changed

+111
-1
lines changed

3 files changed

+111
-1
lines changed

gossipsub.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ var (
6868
GossipSubGraftFloodThreshold = 10 * time.Second
6969
GossipSubMaxIHaveLength = 5000
7070
GossipSubMaxIHaveMessages = 10
71+
GossipSubMaxIDontWantLength = 10
7172
GossipSubMaxIDontWantMessages = 1000
7273
GossipSubIWantFollowupTime = 3 * time.Second
7374
GossipSubIDontWantMessageThreshold = 1024 // 1KB
@@ -218,6 +219,10 @@ type GossipSubParams struct {
218219
// MaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer within a heartbeat.
219220
MaxIHaveMessages int
220221

222+
// MaxIDontWantLength is the maximum number of messages to include in an IDONTWANT message. Also controls
223+
// the maximum number of IDONTWANT ids we will accept to protect against IDONTWANT floods. This value
224+
// should be adjusted if your system anticipates a larger amount than specified per heartbeat.
225+
MaxIDontWantLength int
221226
// MaxIDontWantMessages is the maximum number of IDONTWANT messages to accept from a peer within a heartbeat.
222227
MaxIDontWantMessages int
223228

@@ -303,6 +308,7 @@ func DefaultGossipSubParams() GossipSubParams {
303308
GraftFloodThreshold: GossipSubGraftFloodThreshold,
304309
MaxIHaveLength: GossipSubMaxIHaveLength,
305310
MaxIHaveMessages: GossipSubMaxIHaveMessages,
311+
MaxIDontWantLength: GossipSubMaxIDontWantLength,
306312
MaxIDontWantMessages: GossipSubMaxIDontWantMessages,
307313
IWantFollowupTime: GossipSubIWantFollowupTime,
308314
IDontWantMessageThreshold: GossipSubIDontWantMessageThreshold,
@@ -1014,9 +1020,18 @@ func (gs *GossipSubRouter) handleIDontWant(p peer.ID, ctl *pb.ControlMessage) {
10141020
}
10151021
gs.peerdontwant[p]++
10161022

1023+
totalUnwantedIds := 0
10171024
// Remember all the unwanted message ids
1025+
mainIDWLoop:
10181026
for _, idontwant := range ctl.GetIdontwant() {
10191027
for _, mid := range idontwant.GetMessageIDs() {
1028+
// IDONTWANT flood protection
1029+
if totalUnwantedIds >= gs.params.MaxIDontWantLength {
1030+
log.Debugf("IDONWANT: peer %s has advertised too many ids (%d) within this message; ignoring", p, totalUnwantedIds)
1031+
break mainIDWLoop
1032+
}
1033+
1034+
totalUnwantedIds++
10201035
gs.unwanted[p][computeChecksum(mid)] = gs.params.IDontWantMessageTTL
10211036
}
10221037
}
@@ -1610,7 +1625,7 @@ func (gs *GossipSubRouter) heartbeat() {
16101625
}
16111626

16121627
// do we have too many peers?
1613-
if len(peers) > gs.params.Dhi {
1628+
if len(peers) >= gs.params.Dhi {
16141629
plst := peerMapToList(peers)
16151630

16161631
// sort by score (but shuffle first for the case we don't use the score)

gossipsub_spam_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/rand"
66
"encoding/base64"
7+
"fmt"
78
"strconv"
89
"sync"
910
"testing"
@@ -891,6 +892,53 @@ func TestGossipsubAttackSpamIDONTWANT(t *testing.T) {
891892
<-ctx.Done()
892893
}
893894

895+
func TestGossipsubHandleIDontwantSpam(t *testing.T) {
896+
ctx, cancel := context.WithCancel(context.Background())
897+
defer cancel()
898+
hosts := getDefaultHosts(t, 2)
899+
900+
msgID := func(pmsg *pb.Message) string {
901+
// silly content-based test message-ID: just use the data as whole
902+
return base64.URLEncoding.EncodeToString(pmsg.Data)
903+
}
904+
905+
psubs := make([]*PubSub, 2)
906+
psubs[0] = getGossipsub(ctx, hosts[0], WithMessageIdFn(msgID))
907+
psubs[1] = getGossipsub(ctx, hosts[1], WithMessageIdFn(msgID))
908+
909+
connect(t, hosts[0], hosts[1])
910+
911+
topic := "foobar"
912+
for _, ps := range psubs {
913+
_, err := ps.Subscribe(topic)
914+
if err != nil {
915+
t.Fatal(err)
916+
}
917+
}
918+
exceededIDWLength := GossipSubMaxIDontWantLength + 1
919+
var idwIds []string
920+
for i := 0; i < exceededIDWLength; i++ {
921+
idwIds = append(idwIds, fmt.Sprintf("idontwant-%d", i))
922+
}
923+
rPid := hosts[1].ID()
924+
ctrlMessage := &pb.ControlMessage{Idontwant: []*pb.ControlIDontWant{{MessageIDs: idwIds}}}
925+
grt := psubs[0].rt.(*GossipSubRouter)
926+
grt.handleIDontWant(rPid, ctrlMessage)
927+
928+
if grt.peerdontwant[rPid] != 1 {
929+
t.Errorf("Wanted message count of %d but received %d", 1, grt.peerdontwant[rPid])
930+
}
931+
mid := fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength-1)
932+
if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; !ok {
933+
t.Errorf("Desired message id was not stored in the unwanted map: %s", mid)
934+
}
935+
936+
mid = fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength)
937+
if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; ok {
938+
t.Errorf("Unwanted message id was stored in the unwanted map: %s", mid)
939+
}
940+
}
941+
894942
type mockGSOnRead func(writeMsg func(*pb.RPC), irpc *pb.RPC)
895943

896944
func newMockGS(ctx context.Context, t *testing.T, attacker host.Host, onReadMsg mockGSOnRead) {

gossipsub_test.go

+47
Original file line numberDiff line numberDiff line change
@@ -3280,6 +3280,53 @@ func TestGossipsubIdontwantClear(t *testing.T) {
32803280
<-ctx.Done()
32813281
}
32823282

3283+
func TestGossipsubPruneMeshCorrectly(t *testing.T) {
3284+
ctx, cancel := context.WithCancel(context.Background())
3285+
defer cancel()
3286+
hosts := getDefaultHosts(t, 9)
3287+
3288+
msgID := func(pmsg *pb.Message) string {
3289+
// silly content-based test message-ID: just use the data as whole
3290+
return base64.URLEncoding.EncodeToString(pmsg.Data)
3291+
}
3292+
3293+
params := DefaultGossipSubParams()
3294+
params.Dhi = 8
3295+
3296+
psubs := make([]*PubSub, 9)
3297+
for i := 0; i < 9; i++ {
3298+
psubs[i] = getGossipsub(ctx, hosts[i],
3299+
WithGossipSubParams(params),
3300+
WithMessageIdFn(msgID))
3301+
}
3302+
3303+
topic := "foobar"
3304+
for _, ps := range psubs {
3305+
_, err := ps.Subscribe(topic)
3306+
if err != nil {
3307+
t.Fatal(err)
3308+
}
3309+
}
3310+
3311+
// Connect first peer with the rest of the 8 other
3312+
// peers.
3313+
for i := 1; i < 9; i++ {
3314+
connect(t, hosts[0], hosts[i])
3315+
}
3316+
3317+
// Wait for 2 heartbeats to be able to prune excess peers back down to D.
3318+
totalTimeToWait := params.HeartbeatInitialDelay + 2*params.HeartbeatInterval
3319+
time.Sleep(totalTimeToWait)
3320+
3321+
meshPeers, ok := psubs[0].rt.(*GossipSubRouter).mesh[topic]
3322+
if !ok {
3323+
t.Fatal("mesh does not exist for topic")
3324+
}
3325+
if len(meshPeers) != params.D {
3326+
t.Fatalf("mesh does not have the correct number of peers. Wanted %d but got %d", params.D, len(meshPeers))
3327+
}
3328+
}
3329+
32833330
func BenchmarkAllocDoDropRPC(b *testing.B) {
32843331
gs := GossipSubRouter{tracer: &pubsubTracer{}}
32853332

0 commit comments

Comments
 (0)