Skip to content

query: ip diversity filter #1070

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions amino/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ const (
// After it expires, the returned records will require an extra lookup, to
// find the multiaddress associated with the returned peer id.
DefaultProviderAddrTTL = 24 * time.Hour

// DefaultMaxPeersPerIPGroup is the maximal number of peers with addresses in
// the same IP group allowed in the routing table. Once this limit is
// reached, newly discovered peers with addresses in the same IP group will
// not be added to the routing table.
DefaultMaxPeersPerIPGroup = 3

// DefaultMaxPeersPerIPGroupPerCpl is maxumal number of peers with addresses
// in the same IP group allowed in each routing table bucket, defined by its
// common prefix length to self peer id. The maximal number of peers with
// addresses in the same IP group allowed in the routing table is defined by
// DefaultMaxPeersPerIPGroup.
DefaultMaxPeersPerIPGroupPerCpl = 2
)

// Protocols is a slice containing all supported protocol IDs for Amino DHT.
Expand Down
8 changes: 2 additions & 6 deletions dual/dual.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/ipfs/go-cid"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/amino"
"github.com/libp2p/go-libp2p-kad-dht/internal"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
Expand Down Expand Up @@ -49,11 +50,6 @@ var (
_ routing.ValueStore = (*DHT)(nil)
)

var (
maxPrefixCountPerCpl = 2
maxPrefixCount = 3
)

type config struct {
wan, lan []dht.Option
}
Expand Down Expand Up @@ -106,7 +102,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*DHT, error) {
WanDHTOption(
dht.QueryFilter(dht.PublicQueryFilter),
dht.RoutingTableFilter(dht.PublicRoutingTableFilter),
dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, maxPrefixCountPerCpl, maxPrefixCount)),
dht.RoutingTablePeerDiversityFilter(dht.NewRTPeerDiversityFilter(h, amino.DefaultMaxPeersPerIPGroupPerCpl, amino.DefaultMaxPeersPerIPGroup)),
// filter out all private addresses
dht.AddressFilter(func(addrs []ma.Multiaddr) []ma.Multiaddr { return ma.FilterAddrs(addrs, manet.IsPublicAddr) }),
),
Expand Down
114 changes: 83 additions & 31 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ import (
"context"
"errors"
"fmt"
"maps"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/multiformats/go-base32"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/multiformats/go-multihash"

"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
"github.com/libp2p/go-libp2p-routing-helpers/tracing"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -32,6 +35,7 @@ import (
"google.golang.org/protobuf/proto"

kaddht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/amino"
"github.com/libp2p/go-libp2p-kad-dht/crawler"
"github.com/libp2p/go-libp2p-kad-dht/internal"
internalConfig "github.com/libp2p/go-libp2p-kad-dht/internal/config"
Expand Down Expand Up @@ -113,6 +117,8 @@ type FullRT struct {
self peer.ID

peerConnectednessSubscriber event.Subscription

ipDiversityFilterLimit int
}

// NewFullRT creates a DHT client that tracks the full network. It takes a protocol prefix for the given network,
Expand All @@ -127,10 +133,11 @@ type FullRT struct {
// bootstrap peers).
func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*FullRT, error) {
fullrtcfg := config{
crawlInterval: time.Hour,
bulkSendParallelism: 20,
waitFrac: 0.3,
timeoutPerOp: 5 * time.Second,
crawlInterval: time.Hour,
bulkSendParallelism: 20,
waitFrac: 0.3,
timeoutPerOp: 5 * time.Second,
ipDiversityFilterLimit: amino.DefaultMaxPeersPerIPGroup,
}
if err := fullrtcfg.apply(options...); err != nil {
return nil, err
Expand All @@ -156,7 +163,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
return nil, err
}

ms := net.NewMessageSenderImpl(h, []protocol.ID{dhtcfg.ProtocolPrefix + "/kad/1.0.0"})
ms := net.NewMessageSenderImpl(h, amino.Protocols)
protoMessenger, err := dht_pb.NewProtocolMessenger(ms)
if err != nil {
return nil, err
Expand Down Expand Up @@ -269,9 +276,7 @@ func (dht *FullRT) Stat() map[string]peer.ID {
newMap := make(map[string]peer.ID)

dht.kMapLk.RLock()
for k, v := range dht.keyToPeerMap {
newMap[k] = v
}
maps.Copy(newMap, dht.keyToPeerMap)
dht.kMapLk.RUnlock()
return newMap
}
Expand Down Expand Up @@ -449,7 +454,7 @@ func (dht *FullRT) CheckPeers(ctx context.Context, peers ...peer.ID) (int, int)
func workers(numWorkers int, fn func(peer.AddrInfo), inputs <-chan peer.AddrInfo) {
jobs := make(chan peer.AddrInfo)
defer close(jobs)
for i := 0; i < numWorkers; i++ {
for range numWorkers {
go func() {
for j := range jobs {
fn(j)
Expand All @@ -461,30 +466,77 @@ func workers(numWorkers int, fn func(peer.AddrInfo), inputs <-chan peer.AddrInfo
}
}

// GetClosestPeers tries to return the `dht.bucketSize` closest known peers to
// the given key.
//
// If the IP diversity filter limit is set, the returned peers will contain at
// most `dht.ipDiversityFilterLimit` peers sharing the same IP group. Hence,
// the peers may not be the absolute closest peers to the given key, but they
// will be more diverse in terms of IP addresses.
func (dht *FullRT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
_, span := internal.StartSpan(ctx, "FullRT.GetClosestPeers", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
defer span.End()

kbID := kb.ConvertKey(key)
kadKey := kadkey.KbucketIDToKey(kbID)
dht.rtLk.RLock()
closestKeys := kademlia.ClosestN(kadKey, dht.rt, dht.bucketSize)
dht.rtLk.RUnlock()

peers := make([]peer.ID, 0, len(closestKeys))
for _, k := range closestKeys {
dht.kMapLk.RLock()
p, ok := dht.keyToPeerMap[string(k)]
if !ok {
logger.Errorf("key not found in map")
}
dht.kMapLk.RUnlock()
dht.peerAddrsLk.RLock()
peerAddrs := dht.peerAddrs[p]
dht.peerAddrsLk.RUnlock()
ipGroupCounts := make(map[peerdiversity.PeerIPGroupKey]map[peer.ID]struct{})
peers := make([]peer.ID, 0, dht.bucketSize)

// If ipDiversityFilterLimit is non-zero, the step is slightly larger than
// the bucket size, allowing to have a few backup peers in case some are
// filtered out by the diversity filter. Multiple calls to ClosestN are
// expensive, but increasing the `count` parameter is cheap.
step := dht.bucketSize + 2*dht.ipDiversityFilterLimit
for nClosest := 0; nClosest < dht.rt.Size(); nClosest += step {
dht.rtLk.RLock()
// Get the last `step` closest peers, because we already tried the `nClosest` closest peers
closestKeys := kademlia.ClosestN(kadKey, dht.rt, nClosest+step)[nClosest:]
dht.rtLk.RUnlock()

PeersLoop:
for _, k := range closestKeys {
dht.kMapLk.RLock()
// Recover the peer ID from the key
p, ok := dht.keyToPeerMap[string(k)]
if !ok {
logger.Errorf("key not found in map")
}
dht.kMapLk.RUnlock()
dht.peerAddrsLk.RLock()
peerAddrs := dht.peerAddrs[p]
dht.peerAddrsLk.RUnlock()

if dht.ipDiversityFilterLimit > 0 {
for _, addr := range peerAddrs {
ip, err := manet.ToIP(addr)
if err != nil {
continue
}
ipGroup := peerdiversity.IPGroupKey(ip)
if len(ipGroup) == 0 {
continue
}
if _, ok := ipGroupCounts[ipGroup]; !ok {
ipGroupCounts[ipGroup] = make(map[peer.ID]struct{})
}
if len(ipGroupCounts[ipGroup]) >= dht.ipDiversityFilterLimit {
// This ip group is already overrepresented, skip this peer
continue PeersLoop
}
ipGroupCounts[ipGroup][p] = struct{}{}
}
}

// Add the peer's known addresses to the peerstore so that it can be
// dialed by the caller.
dht.h.Peerstore().AddAddrs(p, peerAddrs, peerstore.TempAddrTTL)
peers = append(peers, p)

dht.h.Peerstore().AddAddrs(p, peerAddrs, peerstore.TempAddrTTL)
peers = append(peers, p)
if len(peers) == dht.bucketSize {
return peers, nil
}
}
}
return peers, nil
}
Expand Down Expand Up @@ -615,7 +667,7 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.
}

stopCh := make(chan struct{})
valCh, lookupRes := dht.getValues(ctx, key, stopCh)
valCh, lookupRes := dht.getValues(ctx, key)

out := make(chan []byte)
go func() {
Expand Down Expand Up @@ -743,7 +795,7 @@ type lookupWithFollowupResult struct {
peers []peer.ID // the top K not unreachable peers at the end of the query
}

func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
func (dht *FullRT) getValues(ctx context.Context, key string) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
valCh := make(chan RecvdVal, 1)
lookupResCh := make(chan *lookupWithFollowupResult, 1)

Expand Down Expand Up @@ -1004,7 +1056,7 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)
keysAsPeerIDs = append(keysAsPeerIDs, peer.ID(k))
}

return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn, true)
return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn)
}

func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte) error {
Expand Down Expand Up @@ -1035,10 +1087,10 @@ func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte)
return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr]))
}

return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn, false)
return dht.bulkMessageSend(ctx, keysAsPeerIDs, fn)
}

func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, target, k peer.ID) error, isProvRec bool) error {
func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(ctx context.Context, target, k peer.ID) error) error {
ctx, span := internal.StartSpan(ctx, "FullRT.BulkMessageSend")
defer span.End()

Expand Down Expand Up @@ -1089,7 +1141,7 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(
workCh := make(chan workMessage, 1)
wg := sync.WaitGroup{}
wg.Add(dht.bulkSendParallelism)
for i := 0; i < dht.bulkSendParallelism; i++ {
for range dht.bulkSendParallelism {
go func() {
defer wg.Done()
defer logger.Debugf("bulk send goroutine done")
Expand Down
Loading