Skip to content
This repository was archived by the owner on Jul 11, 2023. It is now read-only.

Commit 254b2af

Browse files
committed
remove head of line blocking from workerpool
Signed-off-by: Sean Teeling <[email protected]>
1 parent 30e5362 commit 254b2af

File tree

2 files changed

+23
-94
lines changed

2 files changed

+23
-94
lines changed

pkg/workerpool/workerpool.go

+23-55
Original file line numberDiff line numberDiff line change
@@ -3,36 +3,21 @@ package workerpool
33
import (
44
"runtime"
55
"sync"
6-
"sync/atomic"
76
"time"
87

98
"github.com/openservicemesh/osm/pkg/logger"
109
)
1110

12-
const (
13-
// Size of the job queue per worker
14-
maxJobPerWorker = 4096
15-
)
16-
1711
var (
1812
log = logger.New("workerpool")
1913
)
2014

21-
// worker context for a worker routine
22-
type worker struct {
23-
id int
24-
jobs chan Job // Job queue
25-
stop chan struct{} // Stop channel
26-
wg *sync.WaitGroup // Pointer to WorkerPool wg
27-
jobsProcessed uint64 // Jobs processed by this worker
28-
}
29-
3015
// WorkerPool object representation
3116
type WorkerPool struct {
32-
wg sync.WaitGroup // Sync group, to stop workers if needed
33-
workerContext []*worker // Worker contexts
34-
nWorkers uint64 // Number of workers. Uint64 for easier mod hash later
35-
rRobinCounter uint64 // Used only by the round robin api. Modified atomically on API.
17+
wg sync.WaitGroup // Sync group, to stop workers if needed
18+
nWorkers uint64 // Number of workers. Uint64 for easier mod hash later
19+
jobs chan Job
20+
stop chan struct{} // Stop channel
3621
}
3722

3823
// Job is a runnable interface to queue jobs on a WorkerPool
@@ -62,72 +47,55 @@ func NewWorkerPool(nWorkers int) *WorkerPool {
6247

6348
log.Info().Msgf("New worker pool setting up %d workers", nWorkers)
6449

65-
var workPool WorkerPool
50+
workPool := &WorkerPool{
51+
nWorkers: uint64(nWorkers),
52+
jobs: make(chan Job, nWorkers),
53+
stop: make(chan struct{}),
54+
}
6655
for i := 0; i < nWorkers; i++ {
67-
workPool.workerContext = append(workPool.workerContext,
68-
&worker{
69-
id: i,
70-
jobs: make(chan Job, maxJobPerWorker),
71-
stop: make(chan struct{}, 1),
72-
wg: &workPool.wg,
73-
jobsProcessed: 0,
74-
},
75-
)
56+
i := i
7657
workPool.wg.Add(1)
77-
workPool.nWorkers++
78-
79-
go (workPool.workerContext[i]).work()
58+
go workPool.work(i)
8059
}
8160

82-
return &workPool
61+
return workPool
8362
}
8463

8564
// AddJob posts the job on a worker queue
8665
// Uses Hash underneath to choose worker to post the job to
8766
func (wp *WorkerPool) AddJob(job Job) <-chan struct{} {
88-
wp.workerContext[job.Hash()%wp.nWorkers].jobs <- job
67+
wp.jobs <- job
8968
return job.GetDoneCh()
9069
}
9170

92-
// AddJobRoundRobin adds a job in round robin to the queues
93-
// Concurrent calls to AddJobRoundRobin are thread safe and fair
94-
// between each other
95-
func (wp *WorkerPool) AddJobRoundRobin(jobs Job) {
96-
added := atomic.AddUint64(&wp.rRobinCounter, 1)
97-
wp.workerContext[added%wp.nWorkers].jobs <- jobs
98-
}
99-
10071
// GetWorkerNumber get number of queues/workers
10172
func (wp *WorkerPool) GetWorkerNumber() int {
10273
return int(wp.nWorkers)
10374
}
10475

10576
// Stop stops the workerpool
10677
func (wp *WorkerPool) Stop() {
107-
for _, worker := range wp.workerContext {
108-
worker.stop <- struct{}{}
109-
}
78+
close(wp.stop)
11079
wp.wg.Wait()
11180
}
11281

113-
func (workContext *worker) work() {
114-
defer workContext.wg.Done()
82+
func (wp *WorkerPool) work(id int) {
83+
defer wp.wg.Done()
84+
85+
log.Info().Msgf("Worker %d running", id)
11586

116-
log.Info().Msgf("Worker %d running", workContext.id)
11787
for {
11888
select {
119-
case j := <-workContext.jobs:
89+
case j := <-wp.jobs:
12090
t := time.Now()
121-
log.Debug().Msgf("work[%d]: Starting %v", workContext.id, j.JobName())
91+
log.Debug().Msgf("work[%d]: Starting %v", id, j.JobName())
12292

12393
// Run current job
12494
j.Run()
12595

126-
log.Debug().Msgf("work[%d][%s] : took %v", workContext.id, j.JobName(), time.Since(t))
127-
workContext.jobsProcessed++
128-
129-
case <-workContext.stop:
130-
log.Debug().Msgf("work[%d]: Stopped", workContext.id)
96+
log.Debug().Msgf("work[%d][%s] : took %v", id, j.JobName(), time.Since(t))
97+
case <-wp.stop:
98+
log.Debug().Msgf("work[%d]: Stopped", id)
13199
return
132100
}
133101
}

pkg/workerpool/workerpool_test.go

-39
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ func (tj *testJob) Hash() uint64 {
4444

4545
// Uses AddJob, which relies on job hash for queue assignment
4646
func TestAddJob(t *testing.T) {
47-
assert := tassert.New(t)
48-
4947
njobs := 10 // also worker routines
5048
wp := NewWorkerPool(njobs)
5149
joblist := make([]testJob, njobs)
@@ -66,41 +64,4 @@ func TestAddJob(t *testing.T) {
6664
}
6765

6866
wp.Stop()
69-
70-
// Verify all the workers processed 1 job (as expected by the static hash)
71-
for i := 0; i < njobs; i++ {
72-
assert.Equal(uint64(1), wp.workerContext[i].jobsProcessed)
73-
}
74-
}
75-
76-
// Uses AddJobRoundRobin, which relies on round robin for queue assignment
77-
func TestAddJobRoundRobin(t *testing.T) {
78-
assert := tassert.New(t)
79-
80-
njobs := 10 // also worker routines
81-
wp := NewWorkerPool(njobs)
82-
joblist := make([]testJob, njobs)
83-
84-
// Create and add jobs
85-
for i := 0; i < njobs; i++ {
86-
joblist[i] = testJob{
87-
jobDone: make(chan struct{}, 1),
88-
hash: uint64(i),
89-
}
90-
91-
wp.AddJobRoundRobin(&joblist[i])
92-
}
93-
94-
// Verify all jobs ran through the workers
95-
for i := 0; i < njobs; i++ {
96-
<-joblist[i].jobDone
97-
}
98-
99-
wp.Stop()
100-
101-
// Verify all the workers processed 1 job (round-robbined)
102-
assert.Equal(uint64(njobs), wp.rRobinCounter)
103-
for i := 0; i < njobs; i++ {
104-
assert.Equal(uint64(1), wp.workerContext[i].jobsProcessed)
105-
}
10667
}

0 commit comments

Comments
 (0)