Skip to content

Commit 19bcc75

Browse files
authored
fix: bitswap performance issue (#692)
- Fix exhausted wants problem resulting in possible performance issue - Minor improvements for GC. - RWLock not justified for time reading - replace unneeded RWMutex with Mutex - build strings with strings.Builder
1 parent 043c71c commit 19bcc75

File tree

9 files changed

+32
-27
lines changed

9 files changed

+32
-27
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The following emojis are used to highlight certain changes:
2626

2727
- `routing/http/client`: optional address and protocol filter parameters from [IPIP-484](https://github.com/ipfs/specs/pull/484) use human-readable `,` instead of `%2C`. [#688](https://github.com/ipfs/boxo/pull/688)
2828
- `bitswap/client` Cleanup live wants when wants are canceled. This prevents live wants from continuing to get rebroadcasted even after the wants are canceled. [#690](https://github.com/ipfs/boxo/pull/690)
29+
- Fix problem adding invalid CID to exhausted wants list resulting in possible performance issue. [#692](https://github.com/ipfs/boxo/pull/692)
2930

3031
### Security
3132

bitswap/client/internal/messagequeue/messagequeue.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ type MessageQueue struct {
9393

9494
// Dont touch any of these variables outside of run loop
9595
sender bsnet.MessageSender
96-
rebroadcastIntervalLk sync.RWMutex
96+
rebroadcastIntervalLk sync.Mutex
9797
rebroadcastInterval time.Duration
9898
rebroadcastTimer *clock.Timer
9999
// For performance reasons we just clear out the fields of the message
@@ -389,9 +389,9 @@ func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) {
389389

390390
// Startup starts the processing of messages and rebroadcasting.
391391
func (mq *MessageQueue) Startup() {
392-
mq.rebroadcastIntervalLk.RLock()
392+
mq.rebroadcastIntervalLk.Lock()
393393
mq.rebroadcastTimer = mq.clock.Timer(mq.rebroadcastInterval)
394-
mq.rebroadcastIntervalLk.RUnlock()
394+
mq.rebroadcastIntervalLk.Unlock()
395395
go mq.runQueue()
396396
}
397397

@@ -422,7 +422,7 @@ func (mq *MessageQueue) runQueue() {
422422
}
423423

424424
var workScheduled time.Time
425-
for mq.ctx.Err() == nil {
425+
for {
426426
select {
427427
case <-mq.rebroadcastTimer.C:
428428
mq.rebroadcastWantlist()
@@ -471,9 +471,9 @@ func (mq *MessageQueue) runQueue() {
471471

472472
// Periodically resend the list of wants to the peer
473473
func (mq *MessageQueue) rebroadcastWantlist() {
474-
mq.rebroadcastIntervalLk.RLock()
474+
mq.rebroadcastIntervalLk.Lock()
475475
mq.rebroadcastTimer.Reset(mq.rebroadcastInterval)
476-
mq.rebroadcastIntervalLk.RUnlock()
476+
mq.rebroadcastIntervalLk.Unlock()
477477

478478
// If some wants were transferred from the rebroadcast list
479479
if mq.transferRebroadcastWants() {

bitswap/client/internal/notifications/notifications.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,13 @@ func (ps *impl) Shutdown() {
6969
// corresponding to |keys|.
7070
func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block {
7171
blocksCh := make(chan blocks.Block, len(keys))
72-
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
7372
if len(keys) == 0 {
7473
close(blocksCh)
7574
return blocksCh
7675
}
7776

77+
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
78+
7879
// prevent shutdown
7980
ps.lk.RLock()
8081
defer ps.lk.RUnlock()

bitswap/client/internal/peermanager/peermanager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type PeerManager struct {
4242
createPeerQueue PeerQueueFactory
4343
ctx context.Context
4444

45-
psLk sync.RWMutex
45+
psLk sync.Mutex
4646
sessions map[uint64]Session
4747
peerSessions map[peer.ID]map[uint64]struct{}
4848

bitswap/client/internal/peermanager/peerwantmanager.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package peermanager
22

33
import (
4-
"bytes"
54
"fmt"
5+
"strings"
66

77
cid "github.com/ipfs/go-cid"
88
peer "github.com/libp2p/go-libp2p/core/peer"
@@ -158,8 +158,6 @@ func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
158158
// sendWants only sends the peer the want-blocks and want-haves that have not
159159
// already been sent to it.
160160
func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
161-
fltWantBlks := make([]cid.Cid, 0, len(wantBlocks))
162-
fltWantHvs := make([]cid.Cid, 0, len(wantHaves))
163161

164162
// Get the existing want-blocks and want-haves for the peer
165163
pws, ok := pwm.peerWants[p]
@@ -169,6 +167,8 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
169167
return
170168
}
171169

170+
fltWantBlks := make([]cid.Cid, 0, len(wantBlocks))
171+
172172
// Iterate over the requested want-blocks
173173
for _, c := range wantBlocks {
174174
// If the want-block hasn't been sent to the peer
@@ -198,6 +198,8 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
198198
pwm.reverseIndexAdd(c, p)
199199
}
200200

201+
fltWantHvs := make([]cid.Cid, 0, len(wantHaves))
202+
201203
// Iterate over the requested want-haves
202204
for _, c := range wantHaves {
203205
// If we've already broadcasted this want, don't bother with a
@@ -450,7 +452,7 @@ func (pwm *peerWantManager) getWants() []cid.Cid {
450452
}
451453

452454
func (pwm *peerWantManager) String() string {
453-
var b bytes.Buffer
455+
var b strings.Builder
454456
for p, ws := range pwm.peerWants {
455457
b.WriteString(fmt.Sprintf("Peer %s: %d want-have / %d want-block:\n", p, ws.wantHaves.Len(), ws.wantBlocks.Len()))
456458
for _, c := range ws.wantHaves.Keys() {

bitswap/client/internal/session/peerresponsetracker.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID {
3131
return ""
3232
}
3333

34-
rnd := rand.Float64()
35-
3634
// Find the total received blocks for all candidate peers
3735
total := 0
3836
for _, p := range peers {
@@ -41,6 +39,7 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID {
4139

4240
// Choose one of the peers with a chance proportional to the number
4341
// of blocks received from that peer
42+
rnd := rand.Float64()
4443
counted := 0.0
4544
for _, p := range peers {
4645
counted += float64(prt.getPeerCount(p)) / float64(total)
@@ -52,8 +51,7 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID {
5251
// We shouldn't get here unless there is some weirdness with floating point
5352
// math that doesn't quite cover the whole range of peers in the for loop
5453
// so just choose the last peer.
55-
index := len(peers) - 1
56-
return peers[index]
54+
return peers[len(peers)-1]
5755
}
5856

5957
// getPeerCount returns the number of times the peer was first to send us a

bitswap/client/internal/session/sessionwants.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,12 @@ func (sw *sessionWants) GetNextWants() []cid.Cid {
5656
// limit)
5757
currentLiveCount := len(sw.liveWants)
5858
toAdd := sw.broadcastLimit - currentLiveCount
59+
liveSize := min(toAdd, sw.toFetch.Len())
60+
if liveSize == 0 {
61+
return nil
62+
}
5963

60-
var live []cid.Cid
64+
live := make([]cid.Cid, 0, liveSize)
6165
for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- {
6266
c := sw.toFetch.Pop()
6367
live = append(live, c)
@@ -117,6 +121,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
117121
cleaned = append(cleaned, c)
118122
}
119123
}
124+
clear(sw.liveWantsOrder[len(cleaned):]) // GC cleared items
120125
sw.liveWantsOrder = cleaned
121126
}
122127

@@ -127,7 +132,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
127132
// live want CIDs up to the broadcast limit.
128133
func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
129134
now := time.Now()
130-
live := make([]cid.Cid, 0, len(sw.liveWants))
135+
live := make([]cid.Cid, 0, min(len(sw.liveWants), sw.broadcastLimit))
131136
for _, c := range sw.liveWantsOrder {
132137
if _, ok := sw.liveWants[c]; ok {
133138
// No response was received for the want, so reset the sent time

bitswap/client/internal/session/sessionwantsender.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,7 @@ func (sws *sessionWantSender) Cancel(ks []cid.Cid) {
161161
// Update is called when the session receives a message with incoming blocks
162162
// or HAVE / DONT_HAVE
163163
func (sws *sessionWantSender) Update(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
164-
hasUpdate := len(ks) > 0 || len(haves) > 0 || len(dontHaves) > 0
165-
if !hasUpdate {
164+
if len(ks) == 0 && len(haves) == 0 && len(dontHaves) == 0 {
166165
return
167166
}
168167

@@ -349,8 +348,7 @@ func (sws *sessionWantSender) trackWant(c cid.Cid) {
349348
}
350349

351350
// Create the want info
352-
wi := newWantInfo(sws.peerRspTrkr)
353-
sws.wants[c] = wi
351+
sws.wants[c] = newWantInfo(sws.peerRspTrkr)
354352

355353
// For each available peer, register any information we know about
356354
// whether the peer has the block
@@ -481,7 +479,7 @@ func (sws *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyU
481479
// (because it may be the last peer who hadn't sent a DONT_HAVE for a CID)
482480
if len(newlyUnavailable) > 0 {
483481
// Collect all pending wants
484-
wants = make([]cid.Cid, len(sws.wants))
482+
wants = make([]cid.Cid, 0, len(sws.wants))
485483
for c := range sws.wants {
486484
wants = append(wants, c)
487485
}

bitswap/client/internal/sessionmanager/sessionmanager.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type SessionManager struct {
5757
notif notifications.PubSub
5858

5959
// Sessions
60-
sessLk sync.RWMutex
60+
sessLk sync.Mutex
6161
sessions map[uint64]Session
6262

6363
// Session Index
@@ -159,13 +159,13 @@ func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid
159159

160160
// Notify each session that is interested in the blocks / HAVEs / DONT_HAVEs
161161
for _, id := range sm.sessionInterestManager.InterestedSessions(blks, haves, dontHaves) {
162-
sm.sessLk.RLock()
162+
sm.sessLk.Lock()
163163
if sm.sessions == nil { // check if SessionManager was shutdown
164-
sm.sessLk.RUnlock()
164+
sm.sessLk.Unlock()
165165
return
166166
}
167167
sess, ok := sm.sessions[id]
168-
sm.sessLk.RUnlock()
168+
sm.sessLk.Unlock()
169169

170170
if ok {
171171
sess.ReceiveFrom(p, blks, haves, dontHaves)

0 commit comments

Comments
 (0)