Skip to content

Commit 6374d5f

Browse files
committed
Merge pull request #860 from jbenet/fix/hasblock-explosion
implement a worker to consolidate HasBlock provide calls into one
2 parents 4b7493e + 34961a5 commit 6374d5f

File tree

2 files changed

+43
-3
lines changed

2 files changed

+43
-3
lines changed

exchange/bitswap/bitswap.go

+3
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
8989
batchRequests: make(chan *blockRequest, sizeBatchRequestChan),
9090
process: px,
9191
newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
92+
provideKeys: make(chan u.Key),
9293
}
9394
network.SetDelegate(bs)
9495

@@ -124,6 +125,8 @@ type Bitswap struct {
124125
process process.Process
125126

126127
newBlocks chan *blocks.Block
128+
129+
provideKeys chan u.Key
127130
}
128131

129132
type blockRequest struct {

exchange/bitswap/workers.go

+40-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/chuckpreslar/inflect"
77
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
88
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
9+
u "github.com/jbenet/go-ipfs/util"
910
)
1011

1112
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
@@ -24,6 +25,10 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
2425
bs.rebroadcastWorker(ctx)
2526
})
2627

28+
px.Go(func(px process.Process) {
29+
bs.provideCollector(ctx)
30+
})
31+
2732
// Spawn up multiple workers to handle incoming blocks
2833
// consider increasing number if providing blocks bottlenecks
2934
// file transfers
@@ -58,13 +63,13 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
5863
func (bs *Bitswap) provideWorker(ctx context.Context) {
5964
for {
6065
select {
61-
case blk, ok := <-bs.newBlocks:
66+
case k, ok := <-bs.provideKeys:
6267
if !ok {
63-
log.Debug("newBlocks channel closed")
68+
log.Debug("provideKeys channel closed")
6469
return
6570
}
6671
ctx, _ := context.WithTimeout(ctx, provideTimeout)
67-
err := bs.network.Provide(ctx, blk.Key())
72+
err := bs.network.Provide(ctx, k)
6873
if err != nil {
6974
log.Error(err)
7075
}
@@ -74,6 +79,38 @@ func (bs *Bitswap) provideWorker(ctx context.Context) {
7479
}
7580
}
7681

82+
func (bs *Bitswap) provideCollector(ctx context.Context) {
83+
defer close(bs.provideKeys)
84+
var toProvide []u.Key
85+
var nextKey u.Key
86+
var keysOut chan u.Key
87+
88+
for {
89+
select {
90+
case blk, ok := <-bs.newBlocks:
91+
if !ok {
92+
log.Debug("newBlocks channel closed")
93+
return
94+
}
95+
if keysOut == nil {
96+
nextKey = blk.Key()
97+
keysOut = bs.provideKeys
98+
} else {
99+
toProvide = append(toProvide, blk.Key())
100+
}
101+
case keysOut <- nextKey:
102+
if len(toProvide) > 0 {
103+
nextKey = toProvide[0]
104+
toProvide = toProvide[1:]
105+
} else {
106+
keysOut = nil
107+
}
108+
case <-ctx.Done():
109+
return
110+
}
111+
}
112+
}
113+
77114
// TODO ensure only one active request per key
78115
func (bs *Bitswap) clientWorker(parent context.Context) {
79116
defer log.Info("bitswap client worker shutting down...")

0 commit comments

Comments
 (0)