Skip to content
This repository was archived by the owner on Jul 11, 2023. It is now read-only.

remove head of line blocking from workerpool #4648

Merged
merged 1 commit into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 23 additions & 55 deletions pkg/workerpool/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,21 @@ package workerpool
import (
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/openservicemesh/osm/pkg/logger"
)

const (
// Size of the job queue per worker
maxJobPerWorker = 4096
)

var (
log = logger.New("workerpool")
)

// worker context for a worker routine
type worker struct {
id int
jobs chan Job // Job queue
stop chan struct{} // Stop channel
wg *sync.WaitGroup // Pointer to WorkerPool wg
jobsProcessed uint64 // Jobs processed by this worker
}

// WorkerPool object representation
type WorkerPool struct {
wg sync.WaitGroup // Sync group, to stop workers if needed
workerContext []*worker // Worker contexts
nWorkers uint64 // Number of workers. Uint64 for easier mod hash later
rRobinCounter uint64 // Used only by the round robin api. Modified atomically on API.
wg sync.WaitGroup // Sync group, to stop workers if needed
nWorkers uint64 // Number of workers. Uint64 for easier mod hash later
jobs chan Job
stop chan struct{} // Stop channel
}

// Job is a runnable interface to queue jobs on a WorkerPool
Expand Down Expand Up @@ -62,72 +47,55 @@ func NewWorkerPool(nWorkers int) *WorkerPool {

log.Info().Msgf("New worker pool setting up %d workers", nWorkers)

var workPool WorkerPool
workPool := &WorkerPool{
nWorkers: uint64(nWorkers),
jobs: make(chan Job, nWorkers),
stop: make(chan struct{}),
}
for i := 0; i < nWorkers; i++ {
workPool.workerContext = append(workPool.workerContext,
&worker{
id: i,
jobs: make(chan Job, maxJobPerWorker),
stop: make(chan struct{}, 1),
wg: &workPool.wg,
jobsProcessed: 0,
},
)
i := i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this line do?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workPool.wg.Add(1)
workPool.nWorkers++

go (workPool.workerContext[i]).work()
go workPool.work(i)
}

return &workPool
return workPool
}

// AddJob posts the job on a worker queue
// Uses Hash underneath to choose worker to post the job to
func (wp *WorkerPool) AddJob(job Job) <-chan struct{} {
wp.workerContext[job.Hash()%wp.nWorkers].jobs <- job
wp.jobs <- job
return job.GetDoneCh()
}

// AddJobRoundRobin adds a job in round robin to the queues
// Concurrent calls to AddJobRoundRobin are thread safe and fair
// between each other
func (wp *WorkerPool) AddJobRoundRobin(jobs Job) {
added := atomic.AddUint64(&wp.rRobinCounter, 1)
wp.workerContext[added%wp.nWorkers].jobs <- jobs
}

// GetWorkerNumber get number of queues/workers
func (wp *WorkerPool) GetWorkerNumber() int {
return int(wp.nWorkers)
}

// Stop stops the workerpool
func (wp *WorkerPool) Stop() {
for _, worker := range wp.workerContext {
worker.stop <- struct{}{}
}
close(wp.stop)
wp.wg.Wait()
}

func (workContext *worker) work() {
defer workContext.wg.Done()
func (wp *WorkerPool) work(id int) {
defer wp.wg.Done()

log.Info().Msgf("Worker %d running", id)

log.Info().Msgf("Worker %d running", workContext.id)
for {
select {
case j := <-workContext.jobs:
case j := <-wp.jobs:
t := time.Now()
log.Debug().Msgf("work[%d]: Starting %v", workContext.id, j.JobName())
log.Debug().Msgf("work[%d]: Starting %v", id, j.JobName())

// Run current job
j.Run()

log.Debug().Msgf("work[%d][%s] : took %v", workContext.id, j.JobName(), time.Since(t))
workContext.jobsProcessed++

case <-workContext.stop:
log.Debug().Msgf("work[%d]: Stopped", workContext.id)
log.Debug().Msgf("work[%d][%s] : took %v", id, j.JobName(), time.Since(t))
case <-wp.stop:
log.Debug().Msgf("work[%d]: Stopped", id)
return
}
}
Expand Down
39 changes: 0 additions & 39 deletions pkg/workerpool/workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ func (tj *testJob) Hash() uint64 {

// Uses AddJob, which relies on job hash for queue assignment
func TestAddJob(t *testing.T) {
assert := tassert.New(t)

njobs := 10 // also worker routines
wp := NewWorkerPool(njobs)
joblist := make([]testJob, njobs)
Expand All @@ -66,41 +64,4 @@ func TestAddJob(t *testing.T) {
}

wp.Stop()

// Verify all the workers processed 1 job (as expected by the static hash)
for i := 0; i < njobs; i++ {
assert.Equal(uint64(1), wp.workerContext[i].jobsProcessed)
}
}

// Uses AddJobRoundRobin, which relies on round robin for queue assignment
func TestAddJobRoundRobin(t *testing.T) {
assert := tassert.New(t)

njobs := 10 // also worker routines
wp := NewWorkerPool(njobs)
joblist := make([]testJob, njobs)

// Create and add jobs
for i := 0; i < njobs; i++ {
joblist[i] = testJob{
jobDone: make(chan struct{}, 1),
hash: uint64(i),
}

wp.AddJobRoundRobin(&joblist[i])
}

// Verify all jobs ran through the workers
for i := 0; i < njobs; i++ {
<-joblist[i].jobDone
}

wp.Stop()

// Verify all the workers processed 1 job (round-robbined)
assert.Equal(uint64(njobs), wp.rRobinCounter)
for i := 0; i < njobs; i++ {
assert.Equal(uint64(1), wp.workerContext[i].jobsProcessed)
}
}