Skip to content

Commit ef25808

Browse files
authored
chore: no lifecycle context to shutdown ProviderQueryManager (#734)
* no lifecycle context to shutdown ProviderQueryManager, use Close function instead.
1 parent f6befaf commit ef25808

File tree

5 files changed

+71
-60
lines changed

5 files changed

+71
-60
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ The following emojis are used to highlight certain changes:
6868
- `gateway`: `NewCacheBlockStore` and `NewCarBackend` will use `prometheus.DefaultRegisterer` when a custom one is not specified via `WithPrometheusRegistry` [#722](https://github.com/ipfs/boxo/pull/722)
6969
- `filestore`: added opt-in `WithMMapReader` option to `FileManager` to enable memory-mapped file reads [#665](https://github.com/ipfs/boxo/pull/665)
7070
- `bitswap/routing` `ProviderQueryManager` does not require calling `Startup` separate from `New`. [#741](https://github.com/ipfs/boxo/pull/741)
71+
- `bitswap/routing` ProviderQueryManager does not use liftcycle context.
7172

7273
### Changed
7374

bitswap/client/bitswap_with_sessions_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,6 @@ func assertBlockListsFrom(from peer.ID, got, exp []blocks.Block) error {
117117
// TestCustomProviderQueryManager tests that nothing breaks if we use a custom
118118
// PQM when creating bitswap.
119119
func TestCustomProviderQueryManager(t *testing.T) {
120-
ctx, cancel := context.WithCancel(context.Background())
121-
defer cancel()
122-
123120
vnet := getVirtualNetwork()
124121
router := mockrouting.NewServer()
125122
ig := testinstance.NewTestInstanceGenerator(vnet, router, nil, nil)
@@ -130,10 +127,15 @@ func TestCustomProviderQueryManager(t *testing.T) {
130127
b := ig.Next()
131128

132129
// Replace bitswap in instance a with our customized one.
133-
pqm, err := providerquerymanager.New(ctx, a.Adapter, router.Client(a.Identity))
130+
pqm, err := providerquerymanager.New(a.Adapter, router.Client(a.Identity))
134131
if err != nil {
135132
t.Fatal(err)
136133
}
134+
defer pqm.Close()
135+
136+
ctx, cancel := context.WithCancel(context.Background())
137+
defer cancel()
138+
137139
bs := bitswap.New(ctx, a.Adapter, pqm, a.Blockstore,
138140
bitswap.WithClientOption(client.WithDefaultProviderQueryManager(false)))
139141
a.Exchange.Close() // close old to be sure.

bitswap/client/client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr
182182

183183
if bs.providerFinder != nil && bs.defaultProviderQueryManager {
184184
// network can do dialing.
185-
pqm, err := rpqm.New(ctx, network, bs.providerFinder,
185+
pqm, err := rpqm.New(network, bs.providerFinder,
186186
rpqm.WithMaxInProcessRequests(16),
187187
rpqm.WithMaxProviders(10),
188188
rpqm.WithMaxTimeout(10*time.Second))
@@ -512,6 +512,9 @@ func (bs *Client) Close() error {
512512
close(bs.closing)
513513
bs.sm.Shutdown()
514514
bs.cancel()
515+
if bs.pqm != nil {
516+
bs.pqm.Close()
517+
}
515518
bs.notif.Shutdown()
516519
})
517520
return nil

routing/providerquerymanager/providerquerymanager.go

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ type cancelRequestMessage struct {
8585
// - ensure two findprovider calls for the same block don't run concurrently
8686
// - manage timeouts
8787
type ProviderQueryManager struct {
88-
ctx context.Context
88+
closeOnce sync.Once
89+
closing chan struct{}
8990
dialer ProviderQueryDialer
9091
router ProviderQueryRouter
9192
providerQueryMessages chan providerQueryMessage
@@ -133,9 +134,9 @@ func WithMaxProviders(count int) Option {
133134

134135
// New initializes a new ProviderQueryManager for a given context and a given
135136
// network provider.
136-
func New(ctx context.Context, dialer ProviderQueryDialer, router ProviderQueryRouter, opts ...Option) (*ProviderQueryManager, error) {
137+
func New(dialer ProviderQueryDialer, router ProviderQueryRouter, opts ...Option) (*ProviderQueryManager, error) {
137138
pqm := &ProviderQueryManager{
138-
ctx: ctx,
139+
closing: make(chan struct{}),
139140
dialer: dialer,
140141
router: router,
141142
providerQueryMessages: make(chan providerQueryMessage),
@@ -155,6 +156,12 @@ func New(ctx context.Context, dialer ProviderQueryDialer, router ProviderQueryRo
155156
return pqm, nil
156157
}
157158

159+
func (pqm *ProviderQueryManager) Close() {
160+
pqm.closeOnce.Do(func() {
161+
close(pqm.closing)
162+
})
163+
}
164+
158165
type inProgressRequest struct {
159166
providersSoFar []peer.AddrInfo
160167
incoming chan peer.AddrInfo
@@ -180,7 +187,7 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context,
180187
k: k,
181188
inProgressRequestChan: inProgressRequestChan,
182189
}:
183-
case <-pqm.ctx.Done():
190+
case <-pqm.closing:
184191
ch := make(chan peer.AddrInfo)
185192
close(ch)
186193
span.End()
@@ -196,7 +203,7 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context,
196203
// get to receiveProviders.
197204
var receivedInProgressRequest inProgressRequest
198205
select {
199-
case <-pqm.ctx.Done():
206+
case <-pqm.closing:
200207
ch := make(chan peer.AddrInfo)
201208
close(ch)
202209
span.End()
@@ -256,7 +263,7 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k
256263

257264
for receivedProviders.Len() > 0 || incomingProviders != nil {
258265
select {
259-
case <-pqm.ctx.Done():
266+
case <-pqm.closing:
260267
return
261268
case <-sessionCtx.Done():
262269
if incomingProviders != nil {
@@ -300,7 +307,7 @@ func (pqm *ProviderQueryManager) cancelProviderRequest(ctx context.Context, k ci
300307
if !ok {
301308
return
302309
}
303-
case <-pqm.ctx.Done():
310+
case <-pqm.closing:
304311
return
305312
}
306313
}
@@ -316,13 +323,13 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
316323
}
317324

318325
// Read find provider requests until channel is closed. The channel is
319-
// closed as soon as pqm.ctx is canceled, so there is no need to select on
320-
// that context here.
326+
// closed as soon as pqm.Close is called, so there is no need to select on
327+
// any other channel to detect shutdown.
321328
for fpr := range pqm.providerRequestsProcessing.Out() {
322329
if findSem != nil {
323330
select {
324331
case findSem <- struct{}{}:
325-
case <-pqm.ctx.Done():
332+
case <-pqm.closing:
326333
return
327334
}
328335
}
@@ -362,7 +369,7 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
362369
k: k,
363370
p: p,
364371
}:
365-
case <-pqm.ctx.Done():
372+
case <-pqm.closing:
366373
return
367374
}
368375
}(p)
@@ -374,7 +381,7 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
374381
ctx: ctx,
375382
k: k,
376383
}:
377-
case <-pqm.ctx.Done():
384+
case <-pqm.closing:
378385
}
379386
}(fpr.ctx, fpr.k)
380387
}
@@ -402,7 +409,7 @@ func (pqm *ProviderQueryManager) run() {
402409
case nextMessage := <-pqm.providerQueryMessages:
403410
nextMessage.debugMessage()
404411
nextMessage.handle(pqm)
405-
case <-pqm.ctx.Done():
412+
case <-pqm.closing:
406413
return
407414
}
408415
}
@@ -423,7 +430,7 @@ func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) {
423430
for listener := range requestStatus.listeners {
424431
select {
425432
case listener <- rpm.p:
426-
case <-pqm.ctx.Done():
433+
case <-pqm.closing:
427434
return
428435
}
429436
}
@@ -458,12 +465,12 @@ func (npqm *newProvideQueryMessage) debugMessage() {
458465
func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) {
459466
requestStatus, ok := pqm.inProgressRequestStatuses[npqm.k]
460467
if !ok {
461-
ctx, cancelFn := context.WithCancel(pqm.ctx)
468+
ctx, cancelFn := context.WithCancel(context.Background())
462469
span := trace.SpanFromContext(npqm.ctx)
463470
span.AddEvent("NewQuery", trace.WithAttributes(attribute.Stringer("cid", npqm.k)))
464471
ctx = trace.ContextWithSpan(ctx, span)
465472

466-
// Use context derived from pqm.ctx here, and not the context from the
473+
// Use context derived from background here, and not the context from the
467474
// request (npqm.ctx), because this inProgressRequestStatus applies to
468475
// all in-progress requests for the CID (npqm.k).
469476
//
@@ -486,7 +493,7 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) {
486493
k: npqm.k,
487494
ctx: ctx,
488495
}:
489-
case <-pqm.ctx.Done():
496+
case <-pqm.closing:
490497
return
491498
}
492499
} else {
@@ -502,7 +509,7 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) {
502509
providersSoFar: requestStatus.providersSoFar,
503510
incoming: inProgressChan,
504511
}:
505-
case <-pqm.ctx.Done():
512+
case <-pqm.closing:
506513
}
507514
}
508515

routing/providerquerymanager/providerquerymanager_test.go

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,11 @@ func TestNormalSimultaneousFetch(t *testing.T) {
7474
peersFound: peers,
7575
delay: 1 * time.Millisecond,
7676
}
77-
ctx := context.Background()
78-
providerQueryManager := mustNotErr(New(ctx, fpd, fpn))
77+
providerQueryManager := mustNotErr(New(fpd, fpn))
78+
defer providerQueryManager.Close()
7979
keys := random.Cids(2)
8080

81-
sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
81+
sessionCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
8282
defer cancel()
8383
firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0], 0)
8484
secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[1], 0)
@@ -111,11 +111,11 @@ func TestDedupingProviderRequests(t *testing.T) {
111111
peersFound: peers,
112112
delay: 1 * time.Millisecond,
113113
}
114-
ctx := context.Background()
115-
providerQueryManager := mustNotErr(New(ctx, fpd, fpn))
114+
providerQueryManager := mustNotErr(New(fpd, fpn))
115+
defer providerQueryManager.Close()
116116
key := random.Cids(1)[0]
117117

118-
sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
118+
sessionCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
119119
defer cancel()
120120
firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key, 0)
121121
secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key, 0)
@@ -151,12 +151,13 @@ func TestCancelOneRequestDoesNotTerminateAnother(t *testing.T) {
151151
peersFound: peers,
152152
delay: 1 * time.Millisecond,
153153
}
154-
ctx := context.Background()
155-
providerQueryManager := mustNotErr(New(ctx, fpd, fpn))
154+
providerQueryManager := mustNotErr(New(fpd, fpn))
155+
defer providerQueryManager.Close()
156156

157157
key := random.Cids(1)[0]
158158

159159
// first session will cancel before done
160+
ctx := context.Background()
160161
firstSessionCtx, firstCancel := context.WithTimeout(ctx, 3*time.Millisecond)
161162
defer firstCancel()
162163
firstRequestChan := providerQueryManager.FindProvidersAsync(firstSessionCtx, key, 0)
@@ -195,14 +196,13 @@ func TestCancelManagerExitsGracefully(t *testing.T) {
195196
peersFound: peers,
196197
delay: 1 * time.Millisecond,
197198
}
198-
ctx := context.Background()
199-
managerCtx, managerCancel := context.WithTimeout(ctx, 5*time.Millisecond)
200-
defer managerCancel()
201-
providerQueryManager := mustNotErr(New(managerCtx, fpd, fpn))
199+
providerQueryManager := mustNotErr(New(fpd, fpn))
200+
defer providerQueryManager.Close()
201+
time.AfterFunc(5*time.Millisecond, providerQueryManager.Close)
202202

203203
key := random.Cids(1)[0]
204204

205-
sessionCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
205+
sessionCtx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
206206
defer cancel()
207207
firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key, 0)
208208
secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key, 0)
@@ -232,12 +232,12 @@ func TestPeersWithConnectionErrorsNotAddedToPeerList(t *testing.T) {
232232
peersFound: peers,
233233
delay: 1 * time.Millisecond,
234234
}
235-
ctx := context.Background()
236-
providerQueryManager := mustNotErr(New(ctx, fpd, fpn))
235+
providerQueryManager := mustNotErr(New(fpd, fpn))
236+
defer providerQueryManager.Close()
237237

238238
key := random.Cids(1)[0]
239239

240-
sessionCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
240+
sessionCtx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
241241
defer cancel()
242242
firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key, 0)
243243
secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key, 0)
@@ -266,13 +266,11 @@ func TestRateLimitingRequests(t *testing.T) {
266266
peersFound: peers,
267267
delay: 5 * time.Millisecond,
268268
}
269-
ctx := context.Background()
270-
ctx, cancel := context.WithCancel(ctx)
271-
defer cancel()
272-
providerQueryManager := mustNotErr(New(ctx, fpd, fpn, WithMaxInProcessRequests(maxInProcessRequests)))
269+
providerQueryManager := mustNotErr(New(fpd, fpn, WithMaxInProcessRequests(maxInProcessRequests)))
270+
defer providerQueryManager.Close()
273271

274272
keys := random.Cids(maxInProcessRequests + 1)
275-
sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
273+
sessionCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
276274
defer cancel()
277275
var requestChannels []<-chan peer.AddrInfo
278276
for i := 0; i < maxInProcessRequests+1; i++ {
@@ -307,11 +305,11 @@ func TestUnlimitedRequests(t *testing.T) {
307305
peersFound: peers,
308306
delay: 5 * time.Millisecond,
309307
}
310-
ctx := context.Background()
311-
ctx, cancel := context.WithCancel(ctx)
312-
defer cancel()
313-
providerQueryManager := mustNotErr(New(ctx, fpd, fpn, WithMaxInProcessRequests(0)))
308+
providerQueryManager := mustNotErr(New(fpd, fpn, WithMaxInProcessRequests(0)))
309+
defer providerQueryManager.Close()
314310

311+
ctx, cancel := context.WithCancel(context.Background())
312+
defer cancel()
315313
keys := random.Cids(inProcessRequests)
316314
sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
317315
defer cancel()
@@ -346,11 +344,11 @@ func TestFindProviderTimeout(t *testing.T) {
346344
peersFound: peers,
347345
delay: 10 * time.Millisecond,
348346
}
349-
ctx := context.Background()
350-
providerQueryManager := mustNotErr(New(ctx, fpd, fpn, WithMaxTimeout(2*time.Millisecond)))
347+
providerQueryManager := mustNotErr(New(fpd, fpn, WithMaxTimeout(2*time.Millisecond)))
348+
defer providerQueryManager.Close()
351349
keys := random.Cids(1)
352350

353-
sessionCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
351+
sessionCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
354352
defer cancel()
355353
firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0], 0)
356354
var firstPeersReceived []peer.AddrInfo
@@ -369,11 +367,11 @@ func TestFindProviderPreCanceled(t *testing.T) {
369367
peersFound: peers,
370368
delay: 1 * time.Millisecond,
371369
}
372-
ctx := context.Background()
373-
providerQueryManager := mustNotErr(New(ctx, fpd, fpn, WithMaxTimeout(100*time.Millisecond)))
370+
providerQueryManager := mustNotErr(New(fpd, fpn, WithMaxTimeout(100*time.Millisecond)))
371+
defer providerQueryManager.Close()
374372
keys := random.Cids(1)
375373

376-
sessionCtx, cancel := context.WithCancel(ctx)
374+
sessionCtx, cancel := context.WithCancel(context.Background())
377375
cancel()
378376
firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0], 0)
379377
if firstRequestChan == nil {
@@ -393,11 +391,11 @@ func TestCancelFindProvidersAfterCompletion(t *testing.T) {
393391
peersFound: peers,
394392
delay: 1 * time.Millisecond,
395393
}
396-
ctx := context.Background()
397-
providerQueryManager := mustNotErr(New(ctx, fpd, fpn, WithMaxTimeout(100*time.Millisecond)))
394+
providerQueryManager := mustNotErr(New(fpd, fpn, WithMaxTimeout(100*time.Millisecond)))
395+
defer providerQueryManager.Close()
398396
keys := random.Cids(1)
399397

400-
sessionCtx, cancel := context.WithCancel(ctx)
398+
sessionCtx, cancel := context.WithCancel(context.Background())
401399
firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, keys[0], 0)
402400
<-firstRequestChan // wait for everything to start.
403401
time.Sleep(10 * time.Millisecond) // wait for the incoming providres to stop.
@@ -425,11 +423,11 @@ func TestLimitedProviders(t *testing.T) {
425423
peersFound: peers,
426424
delay: 1 * time.Millisecond,
427425
}
428-
ctx := context.Background()
429-
providerQueryManager := mustNotErr(New(ctx, fpd, fpn, WithMaxProviders(max), WithMaxTimeout(100*time.Millisecond)))
426+
providerQueryManager := mustNotErr(New(fpd, fpn, WithMaxProviders(max), WithMaxTimeout(100*time.Millisecond)))
427+
defer providerQueryManager.Close()
430428
keys := random.Cids(1)
431429

432-
providersChan := providerQueryManager.FindProvidersAsync(ctx, keys[0], 0)
430+
providersChan := providerQueryManager.FindProvidersAsync(context.Background(), keys[0], 0)
433431
total := 0
434432
for range providersChan {
435433
total++

0 commit comments

Comments
 (0)