From 8937f5fbda7eb780dd41e42938d31ad0729cedde Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 5 Mar 2015 15:18:57 -0800 Subject: [PATCH 1/3] implement a worker to consolidate HasBlock provide calls into one to alieviate memory pressure --- exchange/bitswap/bitswap.go | 3 ++ exchange/bitswap/workers.go | 56 +++++++++++++++++++++++++++++++++++-- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 3a81015be87..60672d0c315 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -89,6 +89,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, batchRequests: make(chan *blockRequest, sizeBatchRequestChan), process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), + provideKeys: make(chan u.Key), } network.SetDelegate(bs) @@ -124,6 +125,8 @@ type Bitswap struct { process process.Process newBlocks chan *blocks.Block + + provideKeys chan u.Key } type blockRequest struct { diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index da521ef463a..a14b30092c6 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -6,6 +6,7 @@ import ( inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/chuckpreslar/inflect" process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + u "github.com/jbenet/go-ipfs/util" ) func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { @@ -24,6 +25,10 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { bs.rebroadcastWorker(ctx) }) + px.Go(func(px process.Process) { + bs.provideCollector(ctx) + }) + // Spawn up multiple workers to handle incoming blocks // consider increasing number if providing blocks bottlenecks // file transfers @@ -58,13 +63,13 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { func (bs *Bitswap) provideWorker(ctx context.Context) { for { select { - case blk, ok := <-bs.newBlocks: + case k, ok := <-bs.provideKeys: if !ok { - log.Debug("newBlocks channel closed") + log.Debug("provideKeys channel closed") return } ctx, _ := context.WithTimeout(ctx, provideTimeout) - err := bs.network.Provide(ctx, blk.Key()) + err := bs.network.Provide(ctx, k) if err != nil { log.Error(err) } @@ -74,6 +79,51 @@ func (bs *Bitswap) provideWorker(ctx context.Context) { } } +func (bs *Bitswap) provideCollector(ctx context.Context) { + defer close(bs.provideKeys) + var toprovide []u.Key + var nextKey u.Key + + select { + case blk, ok := <-bs.newBlocks: + if !ok { + log.Debug("newBlocks channel closed") + return + } + nextKey = blk.Key() + case <-ctx.Done(): + return + } + + for { + select { + case blk, ok := <-bs.newBlocks: + if !ok { + log.Debug("newBlocks channel closed") + return + } + toprovide = append(toprovide, blk.Key()) + case bs.provideKeys <- nextKey: + if len(toprovide) > 0 { + nextKey = toprovide[0] + toprovide = toprovide[1:] + } else { + select { + case blk, ok := <-bs.newBlocks: + if !ok { + return + } + nextKey = blk.Key() + case <-ctx.Done(): + return + } + } + case <-ctx.Done(): + return + } + } +} + // TODO ensure only one active request per key func (bs *Bitswap) clientWorker(parent context.Context) { defer log.Info("bitswap client worker shutting down...") From bfee4894b4ad734202b14cccb2b718de081f9edc Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 5 Mar 2015 16:27:47 -0800 Subject: [PATCH 2/3] simplify provideCollector --- exchange/bitswap/workers.go | 31 +++++++++---------------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index a14b30092c6..f5f6e655363 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -83,17 +83,7 @@ func (bs *Bitswap) provideCollector(ctx context.Context) { defer close(bs.provideKeys) var toprovide []u.Key var nextKey u.Key - - select { - case blk, ok := <-bs.newBlocks: - if !ok { - log.Debug("newBlocks channel closed") - return - } - nextKey = blk.Key() - case <-ctx.Done(): - return - } + var keysOut chan u.Key for { select { @@ -102,21 +92,18 @@ func (bs *Bitswap) provideCollector(ctx context.Context) { log.Debug("newBlocks channel closed") return } - toprovide = append(toprovide, blk.Key()) - case bs.provideKeys <- nextKey: + if keysOut == nil { + nextKey = blk.Key() + keysOut = bs.provideKeys + } else { + toprovide = append(toprovide, blk.Key()) + } + case keysOut <- nextKey: if len(toprovide) > 0 { nextKey = toprovide[0] toprovide = toprovide[1:] } else { - select { - case blk, ok := <-bs.newBlocks: - if !ok { - return - } - nextKey = blk.Key() - case <-ctx.Done(): - return - } + keysOut = nil } case <-ctx.Done(): return From 34961a5aef1538fc11a3eb442fc67f6bf5833619 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 5 Mar 2015 16:37:40 -0800 Subject: [PATCH 3/3] toprovide -> toProvide --- exchange/bitswap/workers.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index f5f6e655363..967c1bc0c9c 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -81,7 +81,7 @@ func (bs *Bitswap) provideWorker(ctx context.Context) { func (bs *Bitswap) provideCollector(ctx context.Context) { defer close(bs.provideKeys) - var toprovide []u.Key + var toProvide []u.Key var nextKey u.Key var keysOut chan u.Key @@ -96,12 +96,12 @@ func (bs *Bitswap) provideCollector(ctx context.Context) { nextKey = blk.Key() keysOut = bs.provideKeys } else { - toprovide = append(toprovide, blk.Key()) + toProvide = append(toProvide, blk.Key()) } case keysOut <- nextKey: - if len(toprovide) > 0 { - nextKey = toprovide[0] - toprovide = toprovide[1:] + if len(toProvide) > 0 { + nextKey = toProvide[0] + toProvide = toProvide[1:] } else { keysOut = nil }