Skip to content

Commit 070d085

Browse files
Merge pull request #1062 from libp2p/fullrt/cleanup
cleanup: fullrt
2 parents f23cbbb + 5417111 commit 070d085

File tree

1 file changed

+21
-43
lines changed

1 file changed

+21
-43
lines changed

fullrt/dht.go

Lines changed: 21 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/multiformats/go-base32"
14-
"github.com/multiformats/go-multiaddr"
14+
ma "github.com/multiformats/go-multiaddr"
1515
"github.com/multiformats/go-multihash"
1616

1717
"github.com/libp2p/go-libp2p-routing-helpers/tracing"
@@ -97,7 +97,7 @@ type FullRT struct {
9797
keyToPeerMap map[string]peer.ID
9898

9999
peerAddrsLk sync.RWMutex
100-
peerAddrs map[peer.ID][]multiaddr.Multiaddr
100+
peerAddrs map[peer.ID][]ma.Multiaddr
101101

102102
bootstrapPeers []*peer.AddrInfo
103103

@@ -208,7 +208,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
208208
keyToPeerMap: make(map[string]peer.ID),
209209
bucketSize: dhtcfg.BucketSize,
210210

211-
peerAddrs: make(map[peer.ID][]multiaddr.Multiaddr),
211+
peerAddrs: make(map[peer.ID][]ma.Multiaddr),
212212
bootstrapPeers: bsPeers,
213213

214214
triggerRefresh: make(chan struct{}),
@@ -229,11 +229,6 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
229229
return rt, nil
230230
}
231231

232-
type crawlVal struct {
233-
addrs []multiaddr.Multiaddr
234-
key kadkey.Key
235-
}
236-
237232
func (dht *FullRT) runSubscriber() {
238233
defer dht.wg.Done()
239234
ms, ok := dht.messageSender.(dht_pb.MessageSenderWithDisconnect)
@@ -310,7 +305,7 @@ func (dht *FullRT) runCrawler(ctx context.Context) {
310305
defer dht.wg.Done()
311306
t := time.NewTicker(dht.crawlerInterval)
312307

313-
foundPeers := make(map[peer.ID]*crawlVal)
308+
foundPeers := make(map[peer.ID][]ma.Multiaddr)
314309
foundPeersLk := sync.Mutex{}
315310

316311
initialTrigger := make(chan struct{}, 1)
@@ -334,36 +329,20 @@ func (dht *FullRT) runCrawler(ctx context.Context) {
334329
addrs = append(addrs, dht.bootstrapPeers...)
335330
dht.peerAddrsLk.Unlock()
336331

337-
for k := range foundPeers {
338-
delete(foundPeers, k)
339-
}
332+
clear(foundPeers)
340333

341334
start := time.Now()
342335
limitErrOnce := sync.Once{}
343336
dht.crawler.Run(ctx, addrs,
344337
func(p peer.ID, rtPeers []*peer.AddrInfo) {
345-
conns := dht.h.Network().ConnsToPeer(p)
346-
var addrs []multiaddr.Multiaddr
347-
for _, conn := range conns {
348-
addr := conn.RemoteMultiaddr()
349-
addrs = append(addrs, addr)
350-
}
351-
352-
if len(addrs) == 0 {
353-
logger.Debugf("no connections to %v after successful query. keeping addresses from the peerstore", p)
354-
addrs = dht.h.Peerstore().Addrs(p)
355-
}
356-
357338
keep := kaddht.PublicRoutingTableFilter(dht, p)
358339
if !keep {
359340
return
360341
}
361342

362343
foundPeersLk.Lock()
363344
defer foundPeersLk.Unlock()
364-
foundPeers[p] = &crawlVal{
365-
addrs: addrs,
366-
}
345+
foundPeers[p] = dht.h.Peerstore().Addrs(p)
367346
},
368347
func(p peer.ID, err error) {
369348
dialErr, ok := err.(*swarm.DialError)
@@ -382,14 +361,14 @@ func (dht *FullRT) runCrawler(ctx context.Context) {
382361
dur := time.Since(start)
383362
logger.Infof("crawl took %v", dur)
384363

385-
peerAddrs := make(map[peer.ID][]multiaddr.Multiaddr)
364+
peerAddrs := make(map[peer.ID][]ma.Multiaddr)
386365
kPeerMap := make(map[string]peer.ID)
387366
newRt := trie.New()
388-
for peerID, foundCrawlVal := range foundPeers {
389-
foundCrawlVal.key = kadkey.KbucketIDToKey(kb.ConvertPeerID(peerID))
390-
peerAddrs[peerID] = foundCrawlVal.addrs
391-
kPeerMap[string(foundCrawlVal.key)] = peerID
392-
newRt.Add(foundCrawlVal.key)
367+
for peerID, foundAddrs := range foundPeers {
368+
kadKey := kadkey.KbucketIDToKey(kb.ConvertPeerID(peerID))
369+
peerAddrs[peerID] = foundAddrs
370+
kPeerMap[string(kadKey)] = peerID
371+
newRt.Add(kadKey)
393372
}
394373

395374
dht.peerAddrsLk.Lock()
@@ -427,12 +406,12 @@ func (dht *FullRT) CheckPeers(ctx context.Context, peers ...peer.ID) (int, int)
427406
ctx, span := internal.StartSpan(ctx, "FullRT.CheckPeers", trace.WithAttributes(attribute.Int("NumPeers", len(peers))))
428407
defer span.End()
429408

430-
var peerAddrs chan interface{}
409+
var peerAddrs chan peer.AddrInfo
431410
var total int
432411
if len(peers) == 0 {
433412
dht.peerAddrsLk.RLock()
434413
total = len(dht.peerAddrs)
435-
peerAddrs = make(chan interface{}, total)
414+
peerAddrs = make(chan peer.AddrInfo, total)
436415
for k, v := range dht.peerAddrs {
437416
peerAddrs <- peer.AddrInfo{
438417
ID: k,
@@ -443,7 +422,7 @@ func (dht *FullRT) CheckPeers(ctx context.Context, peers ...peer.ID) (int, int)
443422
dht.peerAddrsLk.RUnlock()
444423
} else {
445424
total = len(peers)
446-
peerAddrs = make(chan interface{}, total)
425+
peerAddrs = make(chan peer.AddrInfo, total)
447426
dht.peerAddrsLk.RLock()
448427
for _, p := range peers {
449428
peerAddrs <- peer.AddrInfo{
@@ -457,19 +436,18 @@ func (dht *FullRT) CheckPeers(ctx context.Context, peers ...peer.ID) (int, int)
457436

458437
var success uint64
459438

460-
workers(100, func(i interface{}) {
461-
a := i.(peer.AddrInfo)
439+
workers(100, func(ai peer.AddrInfo) {
462440
dialctx, dialcancel := context.WithTimeout(ctx, time.Second*3)
463-
if err := dht.h.Connect(dialctx, a); err == nil {
441+
if err := dht.h.Connect(dialctx, ai); err == nil {
464442
atomic.AddUint64(&success, 1)
465443
}
466444
dialcancel()
467445
}, peerAddrs)
468446
return int(success), total
469447
}
470448

471-
func workers(numWorkers int, fn func(interface{}), inputs <-chan interface{}) {
472-
jobs := make(chan interface{})
449+
func workers(numWorkers int, fn func(peer.AddrInfo), inputs <-chan peer.AddrInfo) {
450+
jobs := make(chan peer.AddrInfo)
473451
defer close(jobs)
474452
for i := 0; i < numWorkers; i++ {
475453
go func() {
@@ -1436,7 +1414,7 @@ func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (pi peer.AddrInfo,
14361414
defer cancelquery()
14371415

14381416
addrsCh := make(chan *peer.AddrInfo, 1)
1439-
newAddrs := make([]multiaddr.Multiaddr, 0)
1417+
newAddrs := make([]ma.Multiaddr, 0)
14401418

14411419
wg := sync.WaitGroup{}
14421420
wg.Add(1)
@@ -1596,7 +1574,7 @@ func (dht *FullRT) FindLocal(id peer.ID) peer.AddrInfo {
15961574
return peer.AddrInfo{}
15971575
}
15981576

1599-
func (dht *FullRT) maybeAddAddrs(p peer.ID, addrs []multiaddr.Multiaddr, ttl time.Duration) {
1577+
func (dht *FullRT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
16001578
// Don't add addresses for self or our connected peers. We have better ones.
16011579
if p == dht.h.ID() || hasValidConnectedness(dht.h, p) {
16021580
return

0 commit comments

Comments
 (0)