Skip to content

fix: crawler polluting peerstore #1053

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 83 additions & 17 deletions crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
ma "github.com/multiformats/go-multiaddr"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-msgio/pbio"
Expand Down Expand Up @@ -135,11 +136,63 @@ type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo)
// HandleQueryFail is a callback on failed peer query
type HandleQueryFail func(p peer.ID, err error)

type peerAddrs struct {
peers map[peer.ID]map[string]ma.Multiaddr
lk *sync.RWMutex
}

func newPeerAddrs() peerAddrs {
return peerAddrs{
peers: make(map[peer.ID]map[string]ma.Multiaddr),
lk: new(sync.RWMutex),
}
}

func (ps peerAddrs) AddAddrs(peers map[peer.ID][]ma.Multiaddr) {
ps.lk.Lock()
defer ps.lk.Unlock()
ps.addAddrsNoLock(peers)
}

func (ps peerAddrs) RemoveSourceAndAddPeers(source peer.ID, peers map[peer.ID][]ma.Multiaddr) {
ps.lk.Lock()
defer ps.lk.Unlock()

// remove source from peerstore
delete(ps.peers, source)
// add peers to peerstore
ps.addAddrsNoLock(peers)
}

func (ps peerAddrs) addAddrsNoLock(peers map[peer.ID][]ma.Multiaddr) {
for p, addrs := range peers {
if _, ok := ps.peers[p]; !ok {
ps.peers[p] = make(map[string]ma.Multiaddr)
}
for _, addr := range addrs {
ps.peers[p][string(addr.Bytes())] = addr
}
}
}

func (ps peerAddrs) PeerInfo(p peer.ID) peer.AddrInfo {
ps.lk.RLock()
defer ps.lk.RUnlock()

addrs := make([]ma.Multiaddr, 0, len(ps.peers[p]))
for _, addr := range ps.peers[p] {
addrs = append(addrs, addr)
}
return peer.AddrInfo{ID: p, Addrs: addrs}
}

// Run crawls dht peers from an initial seed of `startingPeers`
func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) {
jobs := make(chan peer.ID, 1)
results := make(chan *queryResult, 1)

peerAddrs := newPeerAddrs()

// Start worker goroutines
var wg sync.WaitGroup
wg.Add(c.parallelism)
Expand All @@ -148,7 +201,8 @@ func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo
defer wg.Done()
for p := range jobs {
qctx, cancel := context.WithTimeout(ctx, c.queryTimeout)
res := c.queryPeer(qctx, p)
ai := peerAddrs.PeerInfo(p)
res := c.queryPeer(qctx, ai)
cancel() // do not defer, cleanup after each job
results <- res
}
Expand All @@ -162,26 +216,28 @@ func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo
peersSeen := make(map[peer.ID]struct{})

numSkipped := 0
initialPeersAddrs := make(map[peer.ID][]ma.Multiaddr)
for _, ai := range startingPeers {
extendAddrs := c.host.Peerstore().Addrs(ai.ID)
if len(ai.Addrs) > 0 {
extendAddrs = append(extendAddrs, ai.Addrs...)
c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, c.dialAddressExtendDur)
}
if len(extendAddrs) == 0 {
numSkipped++
continue
}
initialPeersAddrs[ai.ID] = extendAddrs

toDial = append(toDial, ai)
peersSeen[ai.ID] = struct{}{}
}
peerAddrs.AddAddrs(initialPeersAddrs)

if numSkipped > 0 {
logger.Infof("%d starting peers were skipped due to lack of addresses. Starting crawl with %d peers", numSkipped, len(toDial))
}

numQueried := 0
peersQueried := make(map[peer.ID]struct{})
outstanding := 0

for len(toDial) > 0 || outstanding > 0 {
Expand All @@ -197,14 +253,20 @@ func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo
if len(res.data) > 0 {
logger.Debugf("peer %v had %d peers", res.peer, len(res.data))
rtPeers := make([]*peer.AddrInfo, 0, len(res.data))
addrsToUpdate := make(map[peer.ID][]ma.Multiaddr)
for p, ai := range res.data {
c.host.Peerstore().AddAddrs(p, ai.Addrs, c.dialAddressExtendDur)
if _, ok := peersQueried[p]; !ok {
addrsToUpdate[p] = ai.Addrs
}
if _, ok := peersSeen[p]; !ok {
peersSeen[p] = struct{}{}
toDial = append(toDial, ai)
}
rtPeers = append(rtPeers, ai)
}
peersQueried[res.peer] = struct{}{}
peerAddrs.RemoveSourceAndAddPeers(res.peer, addrsToUpdate)

if handleSuccess != nil {
handleSuccess(res.peer, rtPeers)
}
Expand All @@ -214,9 +276,8 @@ func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo
outstanding--
case jobCh <- nextPeerID:
outstanding++
numQueried++
toDial = toDial[1:]
logger.Debugf("starting %d out of %d", numQueried, len(peersSeen))
logger.Debugf("starting %d out of %d", len(peersQueried)+1, len(peersSeen))
}
}
}
Expand All @@ -227,20 +288,25 @@ type queryResult struct {
err error
}

func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult {
tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, c.host.Peerstore(), time.Hour, nil)
func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.AddrInfo) *queryResult {
tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer.ID), time.Hour, c.host.Peerstore(), time.Hour, nil)
if err != nil {
logger.Errorf("error creating rt for peer %v : %v", nextPeer, err)
return &queryResult{nextPeer, nil, err}
logger.Errorf("error creating rt for peer %v : %v", nextPeer.ID, err)
return &queryResult{nextPeer.ID, nil, err}
}

connCtx, cancel := context.WithTimeout(ctx, c.connectTimeout)
defer cancel()
err = c.host.Connect(connCtx, peer.AddrInfo{ID: nextPeer})
err = c.host.Connect(connCtx, nextPeer)
if err != nil {
logger.Debugf("could not connect to peer %v: %v", nextPeer, err)
return &queryResult{nextPeer, nil, err}
logger.Debugf("could not connect to peer %v: %v", nextPeer.ID, err)
return &queryResult{nextPeer.ID, nil, err}
}
// Extend peerstore address ttl for addresses whose ttl is below
// c.dialAddressExtendDur. By now identify has already cleaned up addresses
// provided to Connect above and only kept the listen addresses advertised by
// the remote peer
c.host.Peerstore().AddAddrs(nextPeer.ID, c.host.Peerstore().Addrs(nextPeer.ID), c.dialAddressExtendDur)

localPeers := make(map[peer.ID]*peer.AddrInfo)
var retErr error
Expand All @@ -249,9 +315,9 @@ func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.ID) *query
if err != nil {
panic(err)
}
peers, err := c.dhtRPC.GetClosestPeers(ctx, nextPeer, generatePeer)
peers, err := c.dhtRPC.GetClosestPeers(ctx, nextPeer.ID, generatePeer)
if err != nil {
logger.Debugf("error finding data on peer %v with cpl %d : %v", nextPeer, cpl, err)
logger.Debugf("error finding data on peer %v with cpl %d : %v", nextPeer.ID, cpl, err)
retErr = err
break
}
Expand All @@ -263,8 +329,8 @@ func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.ID) *query
}

if retErr != nil {
return &queryResult{nextPeer, nil, retErr}
return &queryResult{nextPeer.ID, nil, retErr}
}

return &queryResult{nextPeer, localPeers, retErr}
return &queryResult{nextPeer.ID, localPeers, retErr}
}
2 changes: 1 addition & 1 deletion fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
}

if fullrtcfg.crawler == nil {
fullrtcfg.crawler, err = crawler.NewDefaultCrawler(h, crawler.WithParallelism(200))
fullrtcfg.crawler, err = crawler.NewDefaultCrawler(h, crawler.WithParallelism(200), crawler.WithDialAddrExtendDuration(fullrtcfg.crawlInterval))
if err != nil {
return nil, err
}
Expand Down