Skip to content

Commit 1ff84ce

Browse files
fix(bitswap): blockpresencemanager leak (#833)
1 parent 9e257a7 commit 1ff84ce

File tree

6 files changed

+54
-39
lines changed

6 files changed

+54
-39
lines changed

CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ The following emojis are used to highlight certain changes:
2222

2323
### Fixed
2424

25-
- Fix memory leak due to not cleaning up wantlists [#829](https://github.com/ipfs/boxo/pull/829)
25+
- Fix memory leak due to not cleaning up wantlists [#829](https://github.com/ipfs/boxo/pull/829), [#833](https://github.com/ipfs/boxo/pull/833)
2626

2727
### Security
2828

bitswap/client/internal/session/session.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -437,9 +437,9 @@ func (s *Session) handleReceive(ks []cid.Cid) {
437437
// Record latency
438438
s.latencyTrkr.receiveUpdate(len(wanted), totalLatency)
439439

440-
// Inform the SessionInterestManager that this session is no longer
441-
// expecting to receive the wanted keys
442-
s.sim.RemoveSessionWants(s.id, wanted)
440+
// Inform the SessionManager that this session is no longer expecting to
441+
// receive the wanted keys, since we now have them
442+
s.sm.CancelSessionWants(s.id, wanted)
443443

444444
s.idleTick.Stop()
445445

bitswap/client/internal/session/session_test.go

+13-9
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,15 @@ const blockSize = 4
2424

2525
type mockSessionMgr struct {
2626
lk sync.Mutex
27+
sim *bssim.SessionInterestManager
2728
removeSession bool
2829
cancels []cid.Cid
2930
}
3031

31-
func newMockSessionMgr() *mockSessionMgr {
32-
return &mockSessionMgr{}
32+
func newMockSessionMgr(sim *bssim.SessionInterestManager) *mockSessionMgr {
33+
return &mockSessionMgr{
34+
sim: sim,
35+
}
3336
}
3437

3538
func (msm *mockSessionMgr) removeSessionCalled() bool {
@@ -54,6 +57,7 @@ func (msm *mockSessionMgr) CancelSessionWants(sid uint64, wants []cid.Cid) {
5457
msm.lk.Lock()
5558
defer msm.lk.Unlock()
5659
msm.cancels = append(msm.cancels, wants...)
60+
msm.sim.RemoveSessionWants(sid, wants)
5761
}
5862

5963
func newFakeSessionPeerManager() *bsspm.SessionPeerManager {
@@ -164,7 +168,7 @@ func TestSessionGetBlocks(t *testing.T) {
164168
notif := notifications.New()
165169
defer notif.Shutdown()
166170
id := random.SequenceNext()
167-
sm := newMockSessionMgr()
171+
sm := newMockSessionMgr(sim)
168172
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
169173
blks := random.BlocksOfSize(broadcastLiveWantsLimit*2, blockSize)
170174
var cids []cid.Cid
@@ -244,7 +248,7 @@ func TestSessionFindMorePeers(t *testing.T) {
244248
notif := notifications.New()
245249
defer notif.Shutdown()
246250
id := random.SequenceNext()
247-
sm := newMockSessionMgr()
251+
sm := newMockSessionMgr(sim)
248252
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
249253
session.SetBaseTickDelay(200 * time.Microsecond)
250254
blks := random.BlocksOfSize(broadcastLiveWantsLimit*2, blockSize)
@@ -314,7 +318,7 @@ func TestSessionOnPeersExhausted(t *testing.T) {
314318
notif := notifications.New()
315319
defer notif.Shutdown()
316320
id := random.SequenceNext()
317-
sm := newMockSessionMgr()
321+
sm := newMockSessionMgr(sim)
318322
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
319323
blks := random.BlocksOfSize(broadcastLiveWantsLimit+5, blockSize)
320324
var cids []cid.Cid
@@ -351,7 +355,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
351355
notif := notifications.New()
352356
defer notif.Shutdown()
353357
id := random.SequenceNext()
354-
sm := newMockSessionMgr()
358+
sm := newMockSessionMgr(sim)
355359
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "")
356360
blks := random.BlocksOfSize(4, blockSize)
357361
var cids []cid.Cid
@@ -449,7 +453,7 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) {
449453
notif := notifications.New()
450454
defer notif.Shutdown()
451455
id := random.SequenceNext()
452-
sm := newMockSessionMgr()
456+
sm := newMockSessionMgr(sim)
453457

454458
// Create a new session with its own context
455459
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
@@ -492,7 +496,7 @@ func TestSessionOnShutdownCalled(t *testing.T) {
492496
notif := notifications.New()
493497
defer notif.Shutdown()
494498
id := random.SequenceNext()
495-
sm := newMockSessionMgr()
499+
sm := newMockSessionMgr(sim)
496500

497501
// Create a new session with its own context
498502
sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
@@ -519,7 +523,7 @@ func TestSessionReceiveMessageAfterCtxCancel(t *testing.T) {
519523
notif := notifications.New()
520524
defer notif.Shutdown()
521525
id := random.SequenceNext()
522-
sm := newMockSessionMgr()
526+
sm := newMockSessionMgr(sim)
523527
session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "")
524528
blks := random.BlocksOfSize(2, blockSize)
525529
cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()}

bitswap/client/internal/session/sessionwantsender_test.go

+29-14
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
bsbpm "github.com/ipfs/boxo/bitswap/client/internal/blockpresencemanager"
1010
bspm "github.com/ipfs/boxo/bitswap/client/internal/peermanager"
11+
bssim "github.com/ipfs/boxo/bitswap/client/internal/sessioninterestmanager"
1112
bsspm "github.com/ipfs/boxo/bitswap/client/internal/sessionpeermanager"
1213
cid "github.com/ipfs/go-cid"
1314
"github.com/ipfs/go-test/random"
@@ -147,7 +148,8 @@ func TestSendWants(t *testing.T) {
147148
const sid = uint64(1)
148149
pm := newMockPeerManager()
149150
fpm := newFakeSessionPeerManager()
150-
swc := newMockSessionMgr()
151+
sim := bssim.New()
152+
swc := newMockSessionMgr(sim)
151153
bpm := bsbpm.New()
152154
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
153155
onPeersExhausted := func([]cid.Cid) {}
@@ -181,7 +183,8 @@ func TestSendsWantBlockToOnePeerOnly(t *testing.T) {
181183
const sid = uint64(1)
182184
pm := newMockPeerManager()
183185
fpm := newFakeSessionPeerManager()
184-
swc := newMockSessionMgr()
186+
sim := bssim.New()
187+
swc := newMockSessionMgr(sim)
185188
bpm := bsbpm.New()
186189
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
187190
onPeersExhausted := func([]cid.Cid) {}
@@ -231,7 +234,8 @@ func TestReceiveBlock(t *testing.T) {
231234
const sid = uint64(1)
232235
pm := newMockPeerManager()
233236
fpm := newFakeSessionPeerManager()
234-
swc := newMockSessionMgr()
237+
sim := bssim.New()
238+
swc := newMockSessionMgr(sim)
235239
bpm := bsbpm.New()
236240
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
237241
onPeersExhausted := func([]cid.Cid) {}
@@ -284,7 +288,8 @@ func TestCancelWants(t *testing.T) {
284288
const sid = uint64(1)
285289
pm := newMockPeerManager()
286290
fpm := newFakeSessionPeerManager()
287-
swc := newMockSessionMgr()
291+
sim := bssim.New()
292+
swc := newMockSessionMgr(sim)
288293
bpm := bsbpm.New()
289294
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
290295
onPeersExhausted := func([]cid.Cid) {}
@@ -319,7 +324,8 @@ func TestRegisterSessionWithPeerManager(t *testing.T) {
319324
const sid = uint64(1)
320325
pm := newMockPeerManager()
321326
fpm := newFakeSessionPeerManager()
322-
swc := newMockSessionMgr()
327+
sim := bssim.New()
328+
swc := newMockSessionMgr(sim)
323329
bpm := bsbpm.New()
324330
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
325331
onPeersExhausted := func([]cid.Cid) {}
@@ -357,7 +363,8 @@ func TestProtectConnFirstPeerToSendWantedBlock(t *testing.T) {
357363
pm := newMockPeerManager()
358364
fpt := newFakePeerTagger()
359365
fpm := bsspm.New(1, fpt)
360-
swc := newMockSessionMgr()
366+
sim := bssim.New()
367+
swc := newMockSessionMgr(sim)
361368
bpm := bsbpm.New()
362369
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
363370
onPeersExhausted := func([]cid.Cid) {}
@@ -411,7 +418,8 @@ func TestPeerUnavailable(t *testing.T) {
411418
const sid = uint64(1)
412419
pm := newMockPeerManager()
413420
fpm := newFakeSessionPeerManager()
414-
swc := newMockSessionMgr()
421+
sim := bssim.New()
422+
swc := newMockSessionMgr(sim)
415423
bpm := bsbpm.New()
416424
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
417425
onPeersExhausted := func([]cid.Cid) {}
@@ -470,7 +478,8 @@ func TestPeersExhausted(t *testing.T) {
470478
const sid = uint64(1)
471479
pm := newMockPeerManager()
472480
fpm := newFakeSessionPeerManager()
473-
swc := newMockSessionMgr()
481+
sim := bssim.New()
482+
swc := newMockSessionMgr(sim)
474483
bpm := bsbpm.New()
475484
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
476485

@@ -543,7 +552,8 @@ func TestPeersExhaustedLastWaitingPeerUnavailable(t *testing.T) {
543552
const sid = uint64(1)
544553
pm := newMockPeerManager()
545554
fpm := newFakeSessionPeerManager()
546-
swc := newMockSessionMgr()
555+
sim := bssim.New()
556+
swc := newMockSessionMgr(sim)
547557
bpm := bsbpm.New()
548558
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
549559

@@ -590,7 +600,8 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) {
590600
const sid = uint64(1)
591601
pm := newMockPeerManager()
592602
fpm := newFakeSessionPeerManager()
593-
swc := newMockSessionMgr()
603+
sim := bssim.New()
604+
swc := newMockSessionMgr(sim)
594605
bpm := bsbpm.New()
595606
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
596607

@@ -628,7 +639,8 @@ func TestConsecutiveDontHaveLimit(t *testing.T) {
628639
const sid = uint64(1)
629640
pm := newMockPeerManager()
630641
fpm := newFakeSessionPeerManager()
631-
swc := newMockSessionMgr()
642+
sim := bssim.New()
643+
swc := newMockSessionMgr(sim)
632644
bpm := bsbpm.New()
633645
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
634646
onPeersExhausted := func([]cid.Cid) {}
@@ -680,7 +692,8 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
680692
const sid = uint64(1)
681693
pm := newMockPeerManager()
682694
fpm := newFakeSessionPeerManager()
683-
swc := newMockSessionMgr()
695+
sim := bssim.New()
696+
swc := newMockSessionMgr(sim)
684697
bpm := bsbpm.New()
685698
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
686699
onPeersExhausted := func([]cid.Cid) {}
@@ -733,7 +746,8 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
733746
const sid = uint64(1)
734747
pm := newMockPeerManager()
735748
fpm := newFakeSessionPeerManager()
736-
swc := newMockSessionMgr()
749+
sim := bssim.New()
750+
swc := newMockSessionMgr(sim)
737751
bpm := bsbpm.New()
738752
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
739753
onPeersExhausted := func([]cid.Cid) {}
@@ -809,7 +823,8 @@ func TestConsecutiveDontHaveDontRemoveIfHasWantedBlock(t *testing.T) {
809823
const sid = uint64(1)
810824
pm := newMockPeerManager()
811825
fpm := newFakeSessionPeerManager()
812-
swc := newMockSessionMgr()
826+
sim := bssim.New()
827+
swc := newMockSessionMgr(sim)
813828
bpm := bsbpm.New()
814829
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
815830
onPeersExhausted := func([]cid.Cid) {}

bitswap/client/internal/sessionmanager/sessionmanager.go

-5
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,6 @@ func (sm *SessionManager) GetNextSessionID() uint64 {
157157
// their contents. If the caller needs to preserve a copy of the lists it
158158
// should make a copy before calling ReceiveFrom.
159159
func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
160-
// Send CANCEL to all peers with want-have / want-block. This needs to be
161-
// done before filtering out CIDs that peers are no longer interested in,
162-
// to ensure they are removed from PeerManager and PeerQueue want lists.
163-
sm.peerManager.SendCancels(ctx, blks)
164-
165160
// Keep only the keys that at least one session wants
166161
keys := sm.sessionInterestManager.FilterInterests(blks, haves, dontHaves)
167162
blks = keys[0]

bitswap/client/internal/sessionmanager/sessionmanager_test.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func (fs *fakeSession) ReceiveFrom(p peer.ID, ks []cid.Cid, wantBlocks []cid.Cid
4545
fs.ks = append(fs.ks, ks...)
4646
fs.wantBlocks = append(fs.wantBlocks, wantBlocks...)
4747
fs.wantHaves = append(fs.wantHaves, wantHaves...)
48+
fs.sm.CancelSessionWants(fs.id, ks)
4849
}
4950

5051
func (fs *fakeSession) Shutdown() {
@@ -132,13 +133,6 @@ func TestReceiveFrom(t *testing.T) {
132133
sim.RecordSessionInterest(firstSession.ID(), []cid.Cid{block.Cid()})
133134
sim.RecordSessionInterest(thirdSession.ID(), []cid.Cid{block.Cid()})
134135

135-
sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
136-
if len(firstSession.ks) == 0 ||
137-
len(secondSession.ks) > 0 ||
138-
len(thirdSession.ks) == 0 {
139-
t.Fatal("should have received blocks but didn't")
140-
}
141-
142136
sm.ReceiveFrom(ctx, p, []cid.Cid{}, []cid.Cid{block.Cid()}, []cid.Cid{})
143137
if len(firstSession.wantBlocks) == 0 ||
144138
len(secondSession.wantBlocks) > 0 ||
@@ -153,6 +147,13 @@ func TestReceiveFrom(t *testing.T) {
153147
t.Fatal("should have received want-haves but didn't")
154148
}
155149

150+
sm.ReceiveFrom(ctx, p, []cid.Cid{block.Cid()}, []cid.Cid{}, []cid.Cid{})
151+
if len(firstSession.ks) == 0 ||
152+
len(secondSession.ks) > 0 ||
153+
len(thirdSession.ks) == 0 {
154+
t.Fatal("should have received blocks but didn't")
155+
}
156+
156157
require.Len(t, pm.cancelled(), 1, "should have sent cancel for received blocks")
157158
}
158159

0 commit comments

Comments
 (0)