diff --git a/crawler/crawler.go b/crawler/crawler.go index 9927df659..4c243e81a 100644 --- a/crawler/crawler.go +++ b/crawler/crawler.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + ma "github.com/multiformats/go-multiaddr" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-msgio/pbio" @@ -135,11 +136,61 @@ type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo) // HandleQueryFail is a callback on failed peer query type HandleQueryFail func(p peer.ID, err error) +type peerAddrs struct { + peers map[peer.ID]map[string]ma.Multiaddr + lk *sync.RWMutex +} + +func newPeerAddrs() peerAddrs { + return peerAddrs{ + peers: make(map[peer.ID]map[string]ma.Multiaddr), + lk: new(sync.RWMutex), + } +} + +func (ps peerAddrs) RemoveSourceAndAddPeers(source peer.ID, peers map[peer.ID][]ma.Multiaddr) { + ps.lk.Lock() + defer ps.lk.Unlock() + + // remove source from peerstore + delete(ps.peers, source) + // add peers to peerstore + ps.addAddrsNoLock(peers) +} + +func (ps peerAddrs) addAddrsNoLock(peers map[peer.ID][]ma.Multiaddr) { + for p, addrs := range peers { + ps.addPeerAddrsNoLock(p, addrs) + } +} + +func (ps peerAddrs) addPeerAddrsNoLock(p peer.ID, addrs []ma.Multiaddr) { + if _, ok := ps.peers[p]; !ok { + ps.peers[p] = make(map[string]ma.Multiaddr) + } + for _, addr := range addrs { + ps.peers[p][string(addr.Bytes())] = addr + } +} + +func (ps peerAddrs) PeerInfo(p peer.ID) peer.AddrInfo { + ps.lk.RLock() + defer ps.lk.RUnlock() + + addrs := make([]ma.Multiaddr, 0, len(ps.peers[p])) + for _, addr := range ps.peers[p] { + addrs = append(addrs, addr) + } + return peer.AddrInfo{ID: p, Addrs: addrs} +} + // Run crawls dht peers from an initial seed of `startingPeers` func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) { jobs := make(chan peer.ID, 1) results := make(chan *queryResult, 1) + peerAddrs := newPeerAddrs() + // Start worker goroutines var wg sync.WaitGroup wg.Add(c.parallelism) @@ -148,7 +199,8 @@ func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo defer wg.Done() for p := range jobs { qctx, cancel := context.WithTimeout(ctx, c.queryTimeout) - res := c.queryPeer(qctx, p) + ai := peerAddrs.PeerInfo(p) + res := c.queryPeer(qctx, ai) cancel() // do not defer, cleanup after each job results <- res } @@ -162,26 +214,28 @@ func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo peersSeen := make(map[peer.ID]struct{}) numSkipped := 0 + peerAddrs.lk.Lock() for _, ai := range startingPeers { extendAddrs := c.host.Peerstore().Addrs(ai.ID) if len(ai.Addrs) > 0 { extendAddrs = append(extendAddrs, ai.Addrs...) - c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, c.dialAddressExtendDur) } if len(extendAddrs) == 0 { numSkipped++ continue } + peerAddrs.addPeerAddrsNoLock(ai.ID, extendAddrs) toDial = append(toDial, ai) peersSeen[ai.ID] = struct{}{} } + peerAddrs.lk.Unlock() if numSkipped > 0 { logger.Infof("%d starting peers were skipped due to lack of addresses. Starting crawl with %d peers", numSkipped, len(toDial)) } - numQueried := 0 + peersQueried := make(map[peer.ID]struct{}) outstanding := 0 for len(toDial) > 0 || outstanding > 0 { @@ -197,14 +251,20 @@ func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo if len(res.data) > 0 { logger.Debugf("peer %v had %d peers", res.peer, len(res.data)) rtPeers := make([]*peer.AddrInfo, 0, len(res.data)) + addrsToUpdate := make(map[peer.ID][]ma.Multiaddr) for p, ai := range res.data { - c.host.Peerstore().AddAddrs(p, ai.Addrs, c.dialAddressExtendDur) + if _, ok := peersQueried[p]; !ok { + addrsToUpdate[p] = ai.Addrs + } if _, ok := peersSeen[p]; !ok { peersSeen[p] = struct{}{} toDial = append(toDial, ai) } rtPeers = append(rtPeers, ai) } + peersQueried[res.peer] = struct{}{} + peerAddrs.RemoveSourceAndAddPeers(res.peer, addrsToUpdate) + if handleSuccess != nil { handleSuccess(res.peer, rtPeers) } @@ -214,9 +274,8 @@ func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo outstanding-- case jobCh <- nextPeerID: outstanding++ - numQueried++ toDial = toDial[1:] - logger.Debugf("starting %d out of %d", numQueried, len(peersSeen)) + logger.Debugf("starting %d out of %d", len(peersQueried)+1, len(peersSeen)) } } } @@ -227,20 +286,25 @@ type queryResult struct { err error } -func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult { - tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, c.host.Peerstore(), time.Hour, nil) +func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.AddrInfo) *queryResult { + tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer.ID), time.Hour, c.host.Peerstore(), time.Hour, nil) if err != nil { - logger.Errorf("error creating rt for peer %v : %v", nextPeer, err) - return &queryResult{nextPeer, nil, err} + logger.Errorf("error creating rt for peer %v : %v", nextPeer.ID, err) + return &queryResult{nextPeer.ID, nil, err} } connCtx, cancel := context.WithTimeout(ctx, c.connectTimeout) defer cancel() - err = c.host.Connect(connCtx, peer.AddrInfo{ID: nextPeer}) + err = c.host.Connect(connCtx, nextPeer) if err != nil { - logger.Debugf("could not connect to peer %v: %v", nextPeer, err) - return &queryResult{nextPeer, nil, err} + logger.Debugf("could not connect to peer %v: %v", nextPeer.ID, err) + return &queryResult{nextPeer.ID, nil, err} } + // Extend peerstore address ttl for addresses whose ttl is below + // c.dialAddressExtendDur. By now identify has already cleaned up addresses + // provided to Connect above and only kept the listen addresses advertised by + // the remote peer + c.host.Peerstore().AddAddrs(nextPeer.ID, c.host.Peerstore().Addrs(nextPeer.ID), c.dialAddressExtendDur) localPeers := make(map[peer.ID]*peer.AddrInfo) var retErr error @@ -249,9 +313,9 @@ func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.ID) *query if err != nil { panic(err) } - peers, err := c.dhtRPC.GetClosestPeers(ctx, nextPeer, generatePeer) + peers, err := c.dhtRPC.GetClosestPeers(ctx, nextPeer.ID, generatePeer) if err != nil { - logger.Debugf("error finding data on peer %v with cpl %d : %v", nextPeer, cpl, err) + logger.Debugf("error finding data on peer %v with cpl %d : %v", nextPeer.ID, cpl, err) retErr = err break } @@ -263,8 +327,8 @@ func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.ID) *query } if retErr != nil { - return &queryResult{nextPeer, nil, retErr} + return &queryResult{nextPeer.ID, nil, retErr} } - return &queryResult{nextPeer, localPeers, retErr} + return &queryResult{nextPeer.ID, localPeers, retErr} } diff --git a/fullrt/dht.go b/fullrt/dht.go index 6d6c36bf2..2d15fd1f9 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -163,7 +163,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful } if fullrtcfg.crawler == nil { - fullrtcfg.crawler, err = crawler.NewDefaultCrawler(h, crawler.WithParallelism(200)) + fullrtcfg.crawler, err = crawler.NewDefaultCrawler(h, crawler.WithParallelism(200), crawler.WithDialAddrExtendDuration(fullrtcfg.crawlInterval)) if err != nil { return nil, err }