Skip to content

Commit 2c3a43e

Browse files
fix: crawler polluting peerstore (#1053)
* fix: crawler polluting peerstore * rename tempPeerstore to peerAddrs * DialAddrExtendDuration matches fullrt crawl interval * store multiaddr in map directly * remove unnecessary map
1 parent 10cab74 commit 2c3a43e

File tree

2 files changed

+82
-18
lines changed

2 files changed

+82
-18
lines changed

crawler/crawler.go

+81-17
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/libp2p/go-libp2p/core/network"
1010
"github.com/libp2p/go-libp2p/core/peer"
1111
"github.com/libp2p/go-libp2p/core/protocol"
12+
ma "github.com/multiformats/go-multiaddr"
1213

1314
logging "github.com/ipfs/go-log/v2"
1415
"github.com/libp2p/go-msgio/pbio"
@@ -135,11 +136,61 @@ type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo)
135136
// HandleQueryFail is a callback on failed peer query
136137
type HandleQueryFail func(p peer.ID, err error)
137138

139+
type peerAddrs struct {
140+
peers map[peer.ID]map[string]ma.Multiaddr
141+
lk *sync.RWMutex
142+
}
143+
144+
func newPeerAddrs() peerAddrs {
145+
return peerAddrs{
146+
peers: make(map[peer.ID]map[string]ma.Multiaddr),
147+
lk: new(sync.RWMutex),
148+
}
149+
}
150+
151+
func (ps peerAddrs) RemoveSourceAndAddPeers(source peer.ID, peers map[peer.ID][]ma.Multiaddr) {
152+
ps.lk.Lock()
153+
defer ps.lk.Unlock()
154+
155+
// remove source from peerstore
156+
delete(ps.peers, source)
157+
// add peers to peerstore
158+
ps.addAddrsNoLock(peers)
159+
}
160+
161+
func (ps peerAddrs) addAddrsNoLock(peers map[peer.ID][]ma.Multiaddr) {
162+
for p, addrs := range peers {
163+
ps.addPeerAddrsNoLock(p, addrs)
164+
}
165+
}
166+
167+
func (ps peerAddrs) addPeerAddrsNoLock(p peer.ID, addrs []ma.Multiaddr) {
168+
if _, ok := ps.peers[p]; !ok {
169+
ps.peers[p] = make(map[string]ma.Multiaddr)
170+
}
171+
for _, addr := range addrs {
172+
ps.peers[p][string(addr.Bytes())] = addr
173+
}
174+
}
175+
176+
func (ps peerAddrs) PeerInfo(p peer.ID) peer.AddrInfo {
177+
ps.lk.RLock()
178+
defer ps.lk.RUnlock()
179+
180+
addrs := make([]ma.Multiaddr, 0, len(ps.peers[p]))
181+
for _, addr := range ps.peers[p] {
182+
addrs = append(addrs, addr)
183+
}
184+
return peer.AddrInfo{ID: p, Addrs: addrs}
185+
}
186+
138187
// Run crawls dht peers from an initial seed of `startingPeers`
139188
func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) {
140189
jobs := make(chan peer.ID, 1)
141190
results := make(chan *queryResult, 1)
142191

192+
peerAddrs := newPeerAddrs()
193+
143194
// Start worker goroutines
144195
var wg sync.WaitGroup
145196
wg.Add(c.parallelism)
@@ -148,7 +199,8 @@ func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo
148199
defer wg.Done()
149200
for p := range jobs {
150201
qctx, cancel := context.WithTimeout(ctx, c.queryTimeout)
151-
res := c.queryPeer(qctx, p)
202+
ai := peerAddrs.PeerInfo(p)
203+
res := c.queryPeer(qctx, ai)
152204
cancel() // do not defer, cleanup after each job
153205
results <- res
154206
}
@@ -162,26 +214,28 @@ func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo
162214
peersSeen := make(map[peer.ID]struct{})
163215

164216
numSkipped := 0
217+
peerAddrs.lk.Lock()
165218
for _, ai := range startingPeers {
166219
extendAddrs := c.host.Peerstore().Addrs(ai.ID)
167220
if len(ai.Addrs) > 0 {
168221
extendAddrs = append(extendAddrs, ai.Addrs...)
169-
c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, c.dialAddressExtendDur)
170222
}
171223
if len(extendAddrs) == 0 {
172224
numSkipped++
173225
continue
174226
}
227+
peerAddrs.addPeerAddrsNoLock(ai.ID, extendAddrs)
175228

176229
toDial = append(toDial, ai)
177230
peersSeen[ai.ID] = struct{}{}
178231
}
232+
peerAddrs.lk.Unlock()
179233

180234
if numSkipped > 0 {
181235
logger.Infof("%d starting peers were skipped due to lack of addresses. Starting crawl with %d peers", numSkipped, len(toDial))
182236
}
183237

184-
numQueried := 0
238+
peersQueried := make(map[peer.ID]struct{})
185239
outstanding := 0
186240

187241
for len(toDial) > 0 || outstanding > 0 {
@@ -197,14 +251,20 @@ func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo
197251
if len(res.data) > 0 {
198252
logger.Debugf("peer %v had %d peers", res.peer, len(res.data))
199253
rtPeers := make([]*peer.AddrInfo, 0, len(res.data))
254+
addrsToUpdate := make(map[peer.ID][]ma.Multiaddr)
200255
for p, ai := range res.data {
201-
c.host.Peerstore().AddAddrs(p, ai.Addrs, c.dialAddressExtendDur)
256+
if _, ok := peersQueried[p]; !ok {
257+
addrsToUpdate[p] = ai.Addrs
258+
}
202259
if _, ok := peersSeen[p]; !ok {
203260
peersSeen[p] = struct{}{}
204261
toDial = append(toDial, ai)
205262
}
206263
rtPeers = append(rtPeers, ai)
207264
}
265+
peersQueried[res.peer] = struct{}{}
266+
peerAddrs.RemoveSourceAndAddPeers(res.peer, addrsToUpdate)
267+
208268
if handleSuccess != nil {
209269
handleSuccess(res.peer, rtPeers)
210270
}
@@ -214,9 +274,8 @@ func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo
214274
outstanding--
215275
case jobCh <- nextPeerID:
216276
outstanding++
217-
numQueried++
218277
toDial = toDial[1:]
219-
logger.Debugf("starting %d out of %d", numQueried, len(peersSeen))
278+
logger.Debugf("starting %d out of %d", len(peersQueried)+1, len(peersSeen))
220279
}
221280
}
222281
}
@@ -227,20 +286,25 @@ type queryResult struct {
227286
err error
228287
}
229288

230-
func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult {
231-
tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, c.host.Peerstore(), time.Hour, nil)
289+
func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.AddrInfo) *queryResult {
290+
tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer.ID), time.Hour, c.host.Peerstore(), time.Hour, nil)
232291
if err != nil {
233-
logger.Errorf("error creating rt for peer %v : %v", nextPeer, err)
234-
return &queryResult{nextPeer, nil, err}
292+
logger.Errorf("error creating rt for peer %v : %v", nextPeer.ID, err)
293+
return &queryResult{nextPeer.ID, nil, err}
235294
}
236295

237296
connCtx, cancel := context.WithTimeout(ctx, c.connectTimeout)
238297
defer cancel()
239-
err = c.host.Connect(connCtx, peer.AddrInfo{ID: nextPeer})
298+
err = c.host.Connect(connCtx, nextPeer)
240299
if err != nil {
241-
logger.Debugf("could not connect to peer %v: %v", nextPeer, err)
242-
return &queryResult{nextPeer, nil, err}
300+
logger.Debugf("could not connect to peer %v: %v", nextPeer.ID, err)
301+
return &queryResult{nextPeer.ID, nil, err}
243302
}
303+
// Extend peerstore address ttl for addresses whose ttl is below
304+
// c.dialAddressExtendDur. By now identify has already cleaned up addresses
305+
// provided to Connect above and only kept the listen addresses advertised by
306+
// the remote peer
307+
c.host.Peerstore().AddAddrs(nextPeer.ID, c.host.Peerstore().Addrs(nextPeer.ID), c.dialAddressExtendDur)
244308

245309
localPeers := make(map[peer.ID]*peer.AddrInfo)
246310
var retErr error
@@ -249,9 +313,9 @@ func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.ID) *query
249313
if err != nil {
250314
panic(err)
251315
}
252-
peers, err := c.dhtRPC.GetClosestPeers(ctx, nextPeer, generatePeer)
316+
peers, err := c.dhtRPC.GetClosestPeers(ctx, nextPeer.ID, generatePeer)
253317
if err != nil {
254-
logger.Debugf("error finding data on peer %v with cpl %d : %v", nextPeer, cpl, err)
318+
logger.Debugf("error finding data on peer %v with cpl %d : %v", nextPeer.ID, cpl, err)
255319
retErr = err
256320
break
257321
}
@@ -263,8 +327,8 @@ func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.ID) *query
263327
}
264328

265329
if retErr != nil {
266-
return &queryResult{nextPeer, nil, retErr}
330+
return &queryResult{nextPeer.ID, nil, retErr}
267331
}
268332

269-
return &queryResult{nextPeer, localPeers, retErr}
333+
return &queryResult{nextPeer.ID, localPeers, retErr}
270334
}

fullrt/dht.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
163163
}
164164

165165
if fullrtcfg.crawler == nil {
166-
fullrtcfg.crawler, err = crawler.NewDefaultCrawler(h, crawler.WithParallelism(200))
166+
fullrtcfg.crawler, err = crawler.NewDefaultCrawler(h, crawler.WithParallelism(200), crawler.WithDialAddrExtendDuration(fullrtcfg.crawlInterval))
167167
if err != nil {
168168
return nil, err
169169
}

0 commit comments

Comments
 (0)