Skip to content

Commit d3697ef

Browse files
committed
Simplify provide woker goroutines
1 parent d15e952 commit d3697ef

File tree

1 file changed

+26
-46
lines changed

1 file changed

+26
-46
lines changed

bitswap/server/server.go

+26-46
Original file line numberDiff line numberDiff line change
@@ -299,12 +299,7 @@ func (bs *Server) startWorkers(ctx context.Context) {
299299
if bs.provideEnabled {
300300
bs.waitWorkers.Add(1)
301301
go bs.provideCollector(ctx)
302-
303-
// Spawn up multiple workers to handle incoming blocks
304-
// consider increasing number if providing blocks bottlenecks
305-
// file transfers
306-
bs.waitWorkers.Add(1)
307-
go bs.provideWorker(ctx)
302+
bs.startProvideWorkers(ctx)
308303
}
309304
}
310305

@@ -501,48 +496,33 @@ func (bs *Server) provideCollector(ctx context.Context) {
501496
}
502497
}
503498

504-
func (bs *Server) provideWorker(ctx context.Context) {
505-
limit := make(chan struct{}, provideWorkerMax)
506-
defer func() {
507-
// Wait until all limitGoProvide goroutines are done before declaring
508-
// this worker as done.
509-
for i := 0; i < provideWorkerMax; i++ {
510-
limit <- struct{}{}
511-
}
512-
bs.waitWorkers.Done()
513-
}()
514-
515-
limitedGoProvide := func(k cid.Cid, wid int) {
516-
defer func() {
517-
// replace token when done
518-
<-limit
519-
}()
520-
521-
log.Debugw("Bitswap.ProvideWorker.Start", "ID", wid, "cid", k)
522-
defer log.Debugw("Bitswap.ProvideWorker.End", "ID", wid, "cid", k)
523-
524-
ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) // timeout ctx
525-
defer cancel()
526-
527-
if err := bs.network.Provide(ctx, k); err != nil {
528-
log.Warn(err)
529-
}
530-
}
499+
// startProvideWorkers starts provide worker goroutines that provide CID
500+
// supplied by provideCollector.
501+
//
502+
// If providing blocks bottlenecks file transfers then consider increasing
503+
// provideWorkerMax,
504+
func (bs *Server) startProvideWorkers(ctx context.Context) {
505+
bs.waitWorkers.Add(provideWorkerMax)
506+
for id := 0; id < provideWorkerMax; id++ {
507+
go func(wid int) {
508+
defer bs.waitWorkers.Done()
509+
510+
var runCount int
511+
// Read bs.proviudeKeys until closed, when provideCollector exits.
512+
for k := range bs.provideKeys {
513+
runCount++
514+
log.Debugw("Bitswap provider worker start", "ID", wid, "run", runCount, "cid", k)
515+
516+
ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout)
517+
if err := bs.network.Provide(ctx, k); err != nil {
518+
log.Warn(err)
519+
}
520+
cancel()
531521

532-
// worker spawner, reads from bs.provideKeys until it closes, spawning a
533-
// _ratelimited_ number of workers to handle each key.
534-
wid := 2
535-
for k := range bs.provideKeys {
536-
log.Debug("Bitswap.ProvideWorker.Loop")
537-
select {
538-
case limit <- struct{}{}:
539-
go limitedGoProvide(k, wid)
540-
case <-ctx.Done():
541-
return
542-
}
543-
wid++
522+
log.Debugw("Bitswap provider worker done", "ID", wid, "run", runCount, "cid", k)
523+
}
524+
}(id)
544525
}
545-
log.Debug("provideKeys channel closed")
546526
}
547527

548528
func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) {

0 commit comments

Comments
 (0)