Skip to content

Commit 70a6503

Browse files
committed
fix leak in blockresponsemanager
SessionManager.ReceiveFrom added HAVEs/DONT_HAVEs even if no sessions were intersted in those, so the following could happen: Session requests BLOCK from one peer and HAVE from another. BLOCK is received before HAVE. Session cleans up its interest on BLOCK, but the following HAVE is still added to BlockResponseManager leaving it there forever. The solution is to add a CID to BlockResponseManager only if there a session interested in it.
1 parent b0a7e7e commit 70a6503

File tree

3 files changed

+33
-27
lines changed

3 files changed

+33
-27
lines changed

bitswap/client/internal/session/session.go

-7
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,6 @@ func (s *Session) Shutdown() {
189189

190190
// ReceiveFrom receives incoming blocks from the given peer.
191191
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
192-
// The SessionManager tells each Session about all keys that it may be
193-
// interested in. Here the Session filters the keys to the ones that this
194-
// particular Session is interested in.
195-
interestedRes := s.sim.FilterSessionInterested(s.id, ks, haves, dontHaves)
196-
ks = interestedRes[0]
197-
haves = interestedRes[1]
198-
dontHaves = interestedRes[2]
199192
s.logReceiveFrom(from, ks, haves, dontHaves)
200193

201194
// Inform the session want sender that a message has been received

bitswap/client/internal/sessioninterestmanager/sessioninterestmanager.go

+25-16
Original file line numberDiff line numberDiff line change
@@ -175,27 +175,36 @@ func (sim *SessionInterestManager) SplitWantedUnwanted(blks []blocks.Block) ([]b
175175

176176
// When the SessionManager receives a message it calls InterestedSessions() to
177177
// find out which sessions are interested in the message.
178-
func (sim *SessionInterestManager) InterestedSessions(blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) []uint64 {
178+
func (sim *SessionInterestManager) InterestedSessions(sets ...[]cid.Cid) map[uint64][][]cid.Cid {
179179
sim.lk.RLock()
180180
defer sim.lk.RUnlock()
181181

182-
ks := make([]cid.Cid, 0, len(blks)+len(haves)+len(dontHaves))
183-
ks = append(ks, blks...)
184-
ks = append(ks, haves...)
185-
ks = append(ks, dontHaves...)
186-
187182
// Create a set of sessions that are interested in the keys
188-
sesSet := make(map[uint64]struct{})
189-
for _, c := range ks {
190-
for s := range sim.wants[c] {
191-
sesSet[s] = struct{}{}
183+
sesSet := make(map[uint64][][]cid.Cid)
184+
for i, set := range sets {
185+
for _, c := range set {
186+
sessions, ok := sim.wants[c]
187+
if !ok {
188+
continue
189+
}
190+
191+
for ses, ok := range sessions {
192+
if !ok {
193+
continue
194+
}
195+
196+
if _, ok := sesSet[ses]; !ok {
197+
sessionSets := make([][]cid.Cid, len(sets))
198+
for i, set := range sets {
199+
sets[i] = make([]cid.Cid, 0, len(set))
200+
}
201+
sesSet[ses] = sessionSets
202+
}
203+
204+
sesSet[ses][i] = append(sesSet[ses][i], c)
205+
}
192206
}
193207
}
194208

195-
// Convert the set into a list
196-
ses := make([]uint64, 0, len(sesSet))
197-
for s := range sesSet {
198-
ses = append(ses, s)
199-
}
200-
return ses
209+
return sesSet
201210
}

bitswap/client/internal/sessionmanager/sessionmanager.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,8 @@ func (sm *SessionManager) GetNextSessionID() uint64 {
154154

155155
// ReceiveFrom is called when a new message is received
156156
func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
157-
// Record block presence for HAVE / DONT_HAVE
158-
sm.blockPresenceManager.ReceiveFrom(p, haves, dontHaves)
159-
160157
// Notify each session that is interested in the blocks / HAVEs / DONT_HAVEs
161-
for _, id := range sm.sessionInterestManager.InterestedSessions(blks, haves, dontHaves) {
158+
for id, keys := range sm.sessionInterestManager.InterestedSessions(blks, haves, dontHaves) {
162159
sm.sessLk.Lock()
163160
if sm.sessions == nil { // check if SessionManager was shutdown
164161
sm.sessLk.Unlock()
@@ -168,6 +165,13 @@ func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid
168165
sm.sessLk.Unlock()
169166

170167
if ok {
168+
blks = keys[0]
169+
haves = keys[1]
170+
dontHaves = keys[2]
171+
// Record block presence for HAVE / DONT_HAVE
172+
// must be called before Seession.ReceiveFrom
173+
sm.blockPresenceManager.ReceiveFrom(p, haves, dontHaves)
174+
171175
sess.ReceiveFrom(p, blks, haves, dontHaves)
172176
}
173177
}

0 commit comments

Comments
 (0)