Skip to content

Commit 83aebf1

Browse files
authored
fix: use sync.Cond to handle no-task blocking wait (#299)
Ref: #284
1 parent a511707 commit 83aebf1

File tree

1 file changed

+15
-15
lines changed

1 file changed

+15
-15
lines changed

taskqueue/taskqueue.go

+15-15
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package taskqueue
22

33
import (
44
"context"
5-
"sync/atomic"
5+
"sync"
66
"time"
77

88
"github.com/ipfs/go-peertaskqueue"
@@ -33,7 +33,7 @@ type WorkerTaskQueue struct {
3333
cancelFn func()
3434
peerTaskQueue *peertaskqueue.PeerTaskQueue
3535
workSignal chan struct{}
36-
noTaskSignal chan struct{}
36+
noTaskCond *sync.Cond
3737
ticker *time.Ticker
3838
activeTasks int32
3939
}
@@ -46,7 +46,7 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue {
4646
cancelFn: cancelFn,
4747
peerTaskQueue: peertaskqueue.New(),
4848
workSignal: make(chan struct{}, 1),
49-
noTaskSignal: make(chan struct{}, 1),
49+
noTaskCond: sync.NewCond(&sync.Mutex{}),
5050
ticker: time.NewTicker(thawSpeed),
5151
}
5252
}
@@ -93,13 +93,11 @@ func (tq *WorkerTaskQueue) Shutdown() {
9393
}
9494

9595
func (tq *WorkerTaskQueue) WaitForNoActiveTasks() {
96-
for atomic.LoadInt32(&tq.activeTasks) > 0 {
97-
select {
98-
case <-tq.ctx.Done():
99-
return
100-
case <-tq.noTaskSignal:
101-
}
96+
tq.noTaskCond.L.Lock()
97+
for tq.activeTasks > 0 {
98+
tq.noTaskCond.Wait()
10299
}
100+
tq.noTaskCond.L.Unlock()
103101
}
104102

105103
func (tq *WorkerTaskQueue) worker(executor Executor) {
@@ -118,14 +116,16 @@ func (tq *WorkerTaskQueue) worker(executor Executor) {
118116
}
119117
}
120118
for _, task := range tasks {
121-
atomic.AddInt32(&tq.activeTasks, 1)
119+
tq.noTaskCond.L.Lock()
120+
tq.activeTasks = tq.activeTasks + 1
121+
tq.noTaskCond.L.Unlock()
122122
terminate := executor.ExecuteTask(tq.ctx, pid, task)
123-
if atomic.AddInt32(&tq.activeTasks, -1) == 0 {
124-
select {
125-
case tq.noTaskSignal <- struct{}{}:
126-
default:
127-
}
123+
tq.noTaskCond.L.Lock()
124+
tq.activeTasks = tq.activeTasks - 1
125+
if tq.activeTasks == 0 {
126+
tq.noTaskCond.Broadcast()
128127
}
128+
tq.noTaskCond.L.Unlock()
129129
if terminate {
130130
return
131131
}

0 commit comments

Comments
 (0)