Skip to content

Commit f0eac34

Browse files
query: ip diversity filter (#1070)
* query: ip diversity filter * update go-libp2p-kbucket * moving filter func to rt_diversity_filter.go * addressing reviews * ip diversity in fullrt GetClosestPeers * test * addressing review
1 parent e676e51 commit f0eac34

File tree

10 files changed

+601
-117
lines changed

10 files changed

+601
-117
lines changed

amino/defaults.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,18 @@ const (
4747
// After it expires, the returned records will require an extra lookup, to
4848
// find the multiaddress associated with the returned peer id.
4949
DefaultProviderAddrTTL = 24 * time.Hour
50+
51+
// DefaultMaxPeersPerIPGroup is the maximal number of peers with addresses in
52+
// the same IP group allowed in the routing table. Once this limit is
53+
// reached, newly discovered peers with addresses in the same IP group will
54+
// not be added to the routing table.
55+
DefaultMaxPeersPerIPGroup = 3
56+
57+
// DefaultMaxPeersPerIPGroupPerCpl is maximal number of peers with addresses
58+
// in the same IP group allowed in each routing table bucket, defined by its
59+
// common prefix length to self peer id.
60+
// also see: `DefaultMaxPeersPerIPGroup`.
61+
DefaultMaxPeersPerIPGroupPerCpl = 2
5062
)
5163

5264
// Protocols is a slice containing all supported protocol IDs for Amino DHT.

dual/dual.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/ipfs/go-cid"
1111
dht "github.com/libp2p/go-libp2p-kad-dht"
12+
"github.com/libp2p/go-libp2p-kad-dht/amino"
1213
"github.com/libp2p/go-libp2p-kad-dht/internal"
1314
kb "github.com/libp2p/go-libp2p-kbucket"
1415
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
@@ -49,11 +50,6 @@ var (
4950
_ routing.ValueStore = (*DHT)(nil)
5051
)
5152

52-
var (
53-
maxPrefixCountPerCpl = 2
54-
maxPrefixCount = 3
55-
)
56-
5753
type config struct {
5854
wan, lan []dht.Option
5955
}
@@ -106,7 +102,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*DHT, error) {
106102
WanDHTOption(
107103
dht.QueryFilter(dht.PublicQueryFilter),
108104
dht.RoutingTableFilter(dht.PublicRoutingTableFilter),
109-
dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, maxPrefixCountPerCpl, maxPrefixCount)),
105+
dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, amino.DefaultMaxPeersPerIPGroupPerCpl, amino.DefaultMaxPeersPerIPGroup)),
110106
// filter out all private addresses
111107
dht.AddressFilter(func(addrs []ma.Multiaddr) []ma.Multiaddr { return ma.FilterAddrs(addrs, manet.IsPublicAddr) }),
112108
),

fullrt/dht.go

Lines changed: 85 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,18 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
"maps"
89
"math/rand"
910
"sync"
1011
"sync/atomic"
1112
"time"
1213

1314
"github.com/multiformats/go-base32"
1415
ma "github.com/multiformats/go-multiaddr"
16+
manet "github.com/multiformats/go-multiaddr/net"
1517
"github.com/multiformats/go-multihash"
1618

19+
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
1720
"github.com/libp2p/go-libp2p-routing-helpers/tracing"
1821
"github.com/libp2p/go-libp2p/core/event"
1922
"github.com/libp2p/go-libp2p/core/host"
@@ -32,6 +35,7 @@ import (
3235
"google.golang.org/protobuf/proto"
3336

3437
kaddht "github.com/libp2p/go-libp2p-kad-dht"
38+
"github.com/libp2p/go-libp2p-kad-dht/amino"
3539
"github.com/libp2p/go-libp2p-kad-dht/crawler"
3640
"github.com/libp2p/go-libp2p-kad-dht/internal"
3741
internalConfig "github.com/libp2p/go-libp2p-kad-dht/internal/config"
@@ -113,6 +117,8 @@ type FullRT struct {
113117
self peer.ID
114118

115119
peerConnectednessSubscriber event.Subscription
120+
121+
ipDiversityFilterLimit int
116122
}
117123

118124
// NewFullRT creates a DHT client that tracks the full network. It takes a protocol prefix for the given network,
@@ -127,10 +133,11 @@ type FullRT struct {
127133
// bootstrap peers).
128134
func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*FullRT, error) {
129135
fullrtcfg := config{
130-
crawlInterval: time.Hour,
131-
bulkSendParallelism: 20,
132-
waitFrac: 0.3,
133-
timeoutPerOp: 5 * time.Second,
136+
crawlInterval: time.Hour,
137+
bulkSendParallelism: 20,
138+
waitFrac: 0.3,
139+
timeoutPerOp: 5 * time.Second,
140+
ipDiversityFilterLimit: amino.DefaultMaxPeersPerIPGroup,
134141
}
135142
if err := fullrtcfg.apply(options...); err != nil {
136143
return nil, err
@@ -156,7 +163,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
156163
return nil, err
157164
}
158165

159-
ms := net.NewMessageSenderImpl(h, []protocol.ID{dhtcfg.ProtocolPrefix + "/kad/1.0.0"})
166+
ms := net.NewMessageSenderImpl(h, amino.Protocols)
160167
protoMessenger, err := dht_pb.NewProtocolMessenger(ms)
161168
if err != nil {
162169
return nil, err
@@ -266,14 +273,9 @@ func (dht *FullRT) TriggerRefresh(ctx context.Context) error {
266273
}
267274

268275
func (dht *FullRT) Stat() map[string]peer.ID {
269-
newMap := make(map[string]peer.ID)
270-
271276
dht.kMapLk.RLock()
272-
for k, v := range dht.keyToPeerMap {
273-
newMap[k] = v
274-
}
275-
dht.kMapLk.RUnlock()
276-
return newMap
277+
defer dht.kMapLk.RUnlock()
278+
return maps.Clone(dht.keyToPeerMap)
277279
}
278280

279281
// Ready indicates that the routing table has been refreshed recently. It is recommended to be used for operations where
@@ -449,7 +451,7 @@ func (dht *FullRT) CheckPeers(ctx context.Context, peers ...peer.ID) (int, int)
449451
func workers(numWorkers int, fn func(peer.AddrInfo), inputs <-chan peer.AddrInfo) {
450452
jobs := make(chan peer.AddrInfo)
451453
defer close(jobs)
452-
for i := 0; i < numWorkers; i++ {
454+
for range numWorkers {
453455
go func() {
454456
for j := range jobs {
455457
fn(j)
@@ -461,30 +463,78 @@ func workers(numWorkers int, fn func(peer.AddrInfo), inputs <-chan peer.AddrInfo
461463
}
462464
}
463465

466+
// GetClosestPeers tries to return the `dht.bucketSize` closest known peers to
467+
// the given key.
468+
//
469+
// If the IP diversity filter limit is set, the returned peers will contain at
470+
// most `dht.ipDiversityFilterLimit` peers sharing the same IP group. Hence,
471+
// the peers may not be the absolute closest peers to the given key, but they
472+
// will be more diverse in terms of IP addresses.
464473
func (dht *FullRT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
465474
_, span := internal.StartSpan(ctx, "FullRT.GetClosestPeers", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
466475
defer span.End()
467476

468477
kbID := kb.ConvertKey(key)
469478
kadKey := kadkey.KbucketIDToKey(kbID)
470-
dht.rtLk.RLock()
471-
closestKeys := kademlia.ClosestN(kadKey, dht.rt, dht.bucketSize)
472-
dht.rtLk.RUnlock()
473479

474-
peers := make([]peer.ID, 0, len(closestKeys))
475-
for _, k := range closestKeys {
476-
dht.kMapLk.RLock()
477-
p, ok := dht.keyToPeerMap[string(k)]
478-
if !ok {
479-
logger.Errorf("key not found in map")
480-
}
481-
dht.kMapLk.RUnlock()
482-
dht.peerAddrsLk.RLock()
483-
peerAddrs := dht.peerAddrs[p]
484-
dht.peerAddrsLk.RUnlock()
480+
ipGroupCounts := make(map[peerdiversity.PeerIPGroupKey]map[peer.ID]struct{})
481+
peers := make([]peer.ID, 0, dht.bucketSize)
482+
483+
// If ipDiversityFilterLimit is non-zero, the step is slightly larger than
484+
// the bucket size, allowing to have a few backup peers in case some are
485+
// filtered out by the diversity filter. Multiple calls to ClosestN are
486+
// expensive, but increasing the `count` parameter is cheap.
487+
step := dht.bucketSize + 2*dht.ipDiversityFilterLimit
488+
for nClosest := 0; nClosest < dht.rt.Size(); nClosest += step {
489+
dht.rtLk.RLock()
490+
// Get the last `step` closest peers, because we already tried the `nClosest` closest peers
491+
closestKeys := kademlia.ClosestN(kadKey, dht.rt, nClosest+step)[nClosest:]
492+
dht.rtLk.RUnlock()
493+
494+
PeersLoop:
495+
for _, k := range closestKeys {
496+
dht.kMapLk.RLock()
497+
// Recover the peer ID from the key
498+
p, ok := dht.keyToPeerMap[string(k)]
499+
if !ok {
500+
logger.Errorf("key not found in map")
501+
continue
502+
}
503+
dht.kMapLk.RUnlock()
504+
dht.peerAddrsLk.RLock()
505+
peerAddrs := dht.peerAddrs[p]
506+
dht.peerAddrsLk.RUnlock()
507+
508+
if dht.ipDiversityFilterLimit > 0 {
509+
for _, addr := range peerAddrs {
510+
ip, err := manet.ToIP(addr)
511+
if err != nil {
512+
continue
513+
}
514+
ipGroup := peerdiversity.IPGroupKey(ip)
515+
if len(ipGroup) == 0 {
516+
continue
517+
}
518+
if _, ok := ipGroupCounts[ipGroup]; !ok {
519+
ipGroupCounts[ipGroup] = make(map[peer.ID]struct{})
520+
}
521+
if len(ipGroupCounts[ipGroup]) >= dht.ipDiversityFilterLimit {
522+
// This ip group is already overrepresented, skip this peer
523+
continue PeersLoop
524+
}
525+
ipGroupCounts[ipGroup][p] = struct{}{}
526+
}
527+
}
528+
529+
// Add the peer's known addresses to the peerstore so that it can be
530+
// dialed by the caller.
531+
dht.h.Peerstore().AddAddrs(p, peerAddrs, peerstore.TempAddrTTL)
532+
peers = append(peers, p)
485533

486-
dht.h.Peerstore().AddAddrs(p, peerAddrs, peerstore.TempAddrTTL)
487-
peers = append(peers, p)
534+
if len(peers) == dht.bucketSize {
535+
return peers, nil
536+
}
537+
}
488538
}
489539
return peers, nil
490540
}
@@ -615,7 +665,7 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.
615665
}
616666

617667
stopCh := make(chan struct{})
618-
valCh, lookupRes := dht.getValues(ctx, key, stopCh)
668+
valCh, lookupRes := dht.getValues(ctx, key)
619669

620670
out := make(chan []byte)
621671
go func() {
@@ -743,7 +793,7 @@ type lookupWithFollowupResult struct {
743793
peers []peer.ID // the top K not unreachable peers at the end of the query
744794
}
745795

746-
func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
796+
func (dht *FullRT) getValues(ctx context.Context, key string) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
747797
valCh := make(chan RecvdVal, 1)
748798
lookupResCh := make(chan *lookupWithFollowupResult, 1)
749799

@@ -1004,7 +1054,7 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)
10041054
keysAsPeerIDs = append(keysAsPeerIDs, peer.ID(k))
10051055
}
10061056

1007-
return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn, true)
1057+
return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn)
10081058
}
10091059

10101060
func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte) error {
@@ -1035,10 +1085,10 @@ func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte)
10351085
return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr]))
10361086
}
10371087

1038-
return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn, false)
1088+
return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn)
10391089
}
10401090

1041-
func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, target, k peer.ID) error, isProvRec bool) error {
1091+
func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, target, k peer.ID) error) error {
10421092
ctx, span := internal.StartSpan(ctx, "FullRT.BulkMessageSend")
10431093
defer span.End()
10441094

@@ -1089,7 +1139,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(
10891139
workCh := make(chan workMessage, 1)
10901140
wg := sync.WaitGroup{}
10911141
wg.Add(dht.bulkSendParallelism)
1092-
for i := 0; i < dht.bulkSendParallelism; i++ {
1142+
for range dht.bulkSendParallelism {
10931143
go func() {
10941144
defer wg.Done()
10951145
defer logger.Debugf("bulk send goroutine done")

0 commit comments

Comments
 (0)