diff --git a/gossipsub.go b/gossipsub.go index d6041624..56b6886c 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -839,6 +839,11 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb. ihave := make(map[string]*pb.Message) for _, iwant := range ctl.GetIwant() { for _, mid := range iwant.GetMessageIDs() { + // Check if that peer has sent IDONTWANT before, if so don't send them the message + if _, ok := gs.unwanted[p][computeChecksum(mid)]; ok { + continue + } + msg, count, ok := gs.mcache.GetForPeer(mid, p) if !ok { continue diff --git a/gossipsub_test.go b/gossipsub_test.go index 93edeeca..675d164c 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -3079,6 +3079,110 @@ func TestGossipsubIdontwantSmallMessage(t *testing.T) { <-ctx.Done() } +// Test that IWANT will have no effect after IDONTWANT is sent +func TestGossipsubIdontwantBeforeIwant(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getDefaultHosts(t, 3) + + msgID := func(pmsg *pb.Message) string { + // silly content-based test message-ID: just use the data as whole + return base64.URLEncoding.EncodeToString(pmsg.Data) + } + + psubs := make([]*PubSub, 2) + psubs[0] = getGossipsub(ctx, hosts[0], WithMessageIdFn(msgID)) + psubs[1] = getGossipsub(ctx, hosts[1], WithMessageIdFn(msgID)) + + topic := "foobar" + for _, ps := range psubs { + _, err := ps.Subscribe(topic) + if err != nil { + t.Fatal(err) + } + } + + // Wait a bit after the last message before checking the result + msgWaitMax := 2 * time.Second + msgTimer := time.NewTimer(msgWaitMax) + + // Checks we received right messages + msgReceived := false + ihaveReceived := false + checkMsgs := func() { + if msgReceived { + t.Fatalf("Expected no messages received after IDONWANT") + } + if !ihaveReceived { + t.Fatalf("Expected IHAVE received") + } + } + + // Wait for the timer to expire + go func() { + select { + case <-msgTimer.C: + checkMsgs() + cancel() + return + case <-ctx.Done(): + checkMsgs() + } + }() + + newMockGS(ctx, t, hosts[2], func(writeMsg func(*pb.RPC), irpc *pb.RPC) { + // Check if it receives any message + if len(irpc.GetPublish()) > 0 { + msgReceived = true + } + // The middle peer is supposed to send IHAVE + for _, ihave := range irpc.GetControl().GetIhave() { + ihaveReceived = true + mids := ihave.GetMessageIDs() + + writeMsg(&pb.RPC{ + Control: &pb.ControlMessage{Idontwant: []*pb.ControlIDontWant{{MessageIDs: mids}}}, + }) + // Wait for the middle peer to process IDONTWANT + time.Sleep(100 * time.Millisecond) + writeMsg(&pb.RPC{ + Control: &pb.ControlMessage{Iwant: []*pb.ControlIWant{{MessageIDs: mids}}}, + }) + } + // When the middle peer connects it will send us its subscriptions + for _, sub := range irpc.GetSubscriptions() { + if sub.GetSubscribe() { + // Reply by subcribing to the topic and pruning to the middle peer to make sure + // that it's not in the mesh + writeMsg(&pb.RPC{ + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, + Control: &pb.ControlMessage{Prune: []*pb.ControlPrune{{TopicID: sub.Topicid}}}, + }) + + go func() { + // Wait for an interval to make sure the middle peer + // received and processed the subscribe + time.Sleep(100 * time.Millisecond) + + data := make([]byte, 16) + crand.Read(data) + + // Publish the message from the first peer + if err := psubs[0].Publish(topic, data); err != nil { + t.Error(err) + return // cannot call t.Fatal in a non-test goroutine + } + }() + } + } + }) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[1], hosts[2]) + + <-ctx.Done() +} + // Test that IDONTWANT will cleared when it's old enough func TestGossipsubIdontwantClear(t *testing.T) { ctx, cancel := context.WithCancel(context.Background())