Skip to content

Commit 68ad651

Browse files
query cleanup
1 parent 88720a0 commit 68ad651

File tree

3 files changed

+28
-55
lines changed

3 files changed

+28
-55
lines changed

lookup.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ import (
1414
"go.opentelemetry.io/otel/trace"
1515
)
1616

17-
// GetClosestPeers is a Kademlia 'node lookup' operation. Returns a channel of
17+
// GetClosestPeers is a Kademlia 'node lookup' operation. Returns a slice of
1818
// the K closest peers to the given key.
1919
//
20-
// If the context is canceled, this function will return the context error along
21-
// with the closest K peers it has found so far.
20+
// If the context is canceled, this function will return the context error
21+
// along with the closest K peers it has found so far.
2222
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
2323
ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetClosestPeers", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
2424
defer span.End()
@@ -27,9 +27,8 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID,
2727
return nil, fmt.Errorf("can't lookup empty key")
2828
}
2929

30-
//TODO: I can break the interface! return []peer.ID
30+
// TODO: I can break the interface! return []peer.ID
3131
lookupRes, err := dht.runLookupWithFollowup(ctx, key, dht.pmGetClosestPeers(key), func(*qpeerset.QueryPeerset) bool { return false })
32-
3332
if err != nil {
3433
return nil, err
3534
}
@@ -47,7 +46,8 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID,
4746
metrics.NetworkSize.M(int64(ns))
4847
}
4948

50-
// refresh the cpl for this key as the query was successful
49+
// Reset the refresh timer for this key's bucket since we've just
50+
// successfully interacted with the closest peers to key
5151
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())
5252

5353
return lookupRes.peers, nil

query.go

+21-46
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ import (
2424
// ErrNoPeersQueried is returned when we failed to connect to any peers.
2525
var ErrNoPeersQueried = errors.New("failed to query any peers")
2626

27-
type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
28-
type stopFn func(*qpeerset.QueryPeerset) bool
27+
type (
28+
queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
29+
stopFn func(*qpeerset.QueryPeerset) bool
30+
)
2931

3032
// query represents a single DHT query.
3133
type query struct {
@@ -187,7 +189,7 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn
187189
q.recordValuablePeers()
188190
}
189191

190-
res := q.constructLookupResult(targetKadID)
192+
res := q.constructLookupResult()
191193
return res, q.queryPeers, nil
192194
}
193195

@@ -218,45 +220,29 @@ func (q *query) recordValuablePeers() {
218220
}
219221

220222
// constructLookupResult takes the query information and uses it to construct the lookup result
221-
func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult {
222-
// determine if the query terminated early
223-
completed := true
224-
225-
// Lookup and starvation are both valid ways for a lookup to complete. (Starvation does not imply failure.)
226-
// Lookup termination (as defined in isLookupTermination) is not possible in small networks.
227-
// Starvation is a successful query termination in small networks.
228-
if !(q.isLookupTermination() || q.isStarvationTermination()) {
229-
completed = false
230-
}
223+
func (q *query) constructLookupResult() *lookupWithFollowupResult {
224+
// Lookup and starvation are both valid ways for a lookup to complete.
225+
// (Starvation does not imply failure.) Lookup termination (as defined in
226+
// isLookupTermination) is not possible in small networks. Starvation is a
227+
// successful query termination in small networks.
228+
completed := q.isLookupTermination() || q.isStarvationTermination()
231229

232230
// extract the top K not unreachable peers
233-
var peers []peer.ID
234-
peerState := make(map[peer.ID]qpeerset.PeerState)
235-
qp := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried)
236-
for _, p := range qp {
237-
state := q.queryPeers.GetState(p)
238-
peerState[p] = state
239-
peers = append(peers, p)
240-
}
241-
242-
// get the top K overall peers
243-
sortedPeers := kb.SortClosestPeers(peers, target)
244-
if len(sortedPeers) > q.dht.bucketSize {
245-
sortedPeers = sortedPeers[:q.dht.bucketSize]
246-
}
231+
peers := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried)
247232

233+
// get the top K overall peers (including unreachable)
248234
closest := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried, qpeerset.PeerUnreachable)
249235

250236
// return the top K not unreachable peers as well as their states at the end of the query
251237
res := &lookupWithFollowupResult{
252-
peers: sortedPeers,
253-
state: make([]qpeerset.PeerState, len(sortedPeers)),
238+
peers: peers,
239+
state: make([]qpeerset.PeerState, len(peers)),
254240
completed: completed,
255241
closest: closest,
256242
}
257243

258-
for i, p := range sortedPeers {
259-
res.state[i] = peerState[p]
244+
for i, p := range peers {
245+
res.state[i] = q.queryPeers.GetState(p)
260246
}
261247

262248
return res
@@ -301,7 +287,7 @@ func (q *query) run() {
301287

302288
// termination is triggered on end-of-lookup conditions or starvation of unused peers
303289
// it also returns the peers we should query next for a maximum of `maxNumQueriesToSpawn` peers.
304-
ready, reason, qPeers := q.isReadyToTerminate(pathCtx, maxNumQueriesToSpawn)
290+
ready, reason, qPeers := q.isReadyToTerminate(maxNumQueriesToSpawn)
305291
if ready {
306292
q.terminate(pathCtx, cancelPath, reason)
307293
}
@@ -347,7 +333,7 @@ func (q *query) spawnQuery(ctx context.Context, cause peer.ID, queryPeer peer.ID
347333
go q.queryPeer(ctx, ch, queryPeer)
348334
}
349335

350-
func (q *query) isReadyToTerminate(ctx context.Context, nPeersToQuery int) (bool, LookupTerminationReason, []peer.ID) {
336+
func (q *query) isReadyToTerminate(nPeersToQuery int) (bool, LookupTerminationReason, []peer.ID) {
351337
// give the application logic a chance to terminate
352338
if q.stopFn(q.queryPeers) {
353339
return true, LookupStopped, nil
@@ -360,16 +346,7 @@ func (q *query) isReadyToTerminate(ctx context.Context, nPeersToQuery int) (bool
360346
}
361347

362348
// The peers we query next should be ones that we have only Heard about.
363-
var peersToQuery []peer.ID
364-
peers := q.queryPeers.GetClosestInStates(qpeerset.PeerHeard)
365-
count := 0
366-
for _, p := range peers {
367-
peersToQuery = append(peersToQuery, p)
368-
count++
369-
if count == nPeersToQuery {
370-
break
371-
}
372-
}
349+
peersToQuery := q.queryPeers.GetClosestNInStates(nPeersToQuery, qpeerset.PeerHeard)
373350

374351
return false, -1, peersToQuery
375352
}
@@ -415,12 +392,10 @@ func (q *query) terminate(ctx context.Context, cancel context.CancelFunc, reason
415392
// queryPeer queries a single peer and reports its findings on the channel.
416393
// queryPeer does not access the query state in queryPeers!
417394
func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID) {
418-
defer q.waitGroup.Done()
419-
420395
ctx, span := internal.StartSpan(ctx, "IpfsDHT.QueryPeer")
421396
defer span.End()
422397

423-
dialCtx, queryCtx := ctx, ctx
398+
dialCtx, queryCtx := ctx, q.ctx
424399

425400
// dial the peer
426401
if err := q.dht.dialPeer(dialCtx, p); err != nil {

routing.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -496,14 +496,12 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i
496496
ctx, end := tracer.FindProvidersAsync(dhtName, ctx, key, count)
497497
defer func() { ch = end(ch, nil) }()
498498

499+
peerOut := make(chan peer.AddrInfo)
499500
if !dht.enableProviders || !key.Defined() {
500-
peerOut := make(chan peer.AddrInfo)
501501
close(peerOut)
502502
return peerOut
503503
}
504504

505-
peerOut := make(chan peer.AddrInfo)
506-
507505
keyMH := key.Hash()
508506

509507
logger.Debugw("finding providers", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))

0 commit comments

Comments
 (0)