@@ -9,30 +9,17 @@ import (
9
9
"github.com/openservicemesh/osm/pkg/logger"
10
10
)
11
11
12
- const (
13
- // Size of the job queue per worker
14
- maxJobPerWorker = 4096
15
- )
16
-
17
12
var (
18
13
log = logger .New ("workerpool" )
19
14
)
20
15
21
- // worker context for a worker routine
22
- type worker struct {
23
- id int
24
- jobs chan Job // Job queue
25
- stop chan struct {} // Stop channel
26
- wg * sync.WaitGroup // Pointer to WorkerPool wg
27
- jobsProcessed uint64 // Jobs processed by this worker
28
- }
29
-
30
16
// WorkerPool object representation
31
17
type WorkerPool struct {
32
18
wg sync.WaitGroup // Sync group, to stop workers if needed
33
- workerContext []* worker // Worker contexts
34
19
nWorkers uint64 // Number of workers. Uint64 for easier mod hash later
35
- rRobinCounter uint64 // Used only by the round robin api. Modified atomically on API.
20
+ jobsProcessed uint64 // Jobs processed by this worker
21
+ jobs chan Job
22
+ stop chan struct {} // Stop channel
36
23
}
37
24
38
25
// Job is a runnable interface to queue jobs on a WorkerPool
@@ -62,72 +49,56 @@ func NewWorkerPool(nWorkers int) *WorkerPool {
62
49
63
50
log .Info ().Msgf ("New worker pool setting up %d workers" , nWorkers )
64
51
65
- var workPool WorkerPool
52
+ workPool := & WorkerPool {
53
+ nWorkers : uint64 (nWorkers ),
54
+ jobs : make (chan Job , nWorkers ),
55
+ stop : make (chan struct {}),
56
+ }
66
57
for i := 0 ; i < nWorkers ; i ++ {
67
- workPool .workerContext = append (workPool .workerContext ,
68
- & worker {
69
- id : i ,
70
- jobs : make (chan Job , maxJobPerWorker ),
71
- stop : make (chan struct {}, 1 ),
72
- wg : & workPool .wg ,
73
- jobsProcessed : 0 ,
74
- },
75
- )
76
- workPool .wg .Add (1 )
77
- workPool .nWorkers ++
78
-
79
- go (workPool .workerContext [i ]).work ()
58
+ i := i
59
+ go workPool .work (i )
80
60
}
81
61
82
- return & workPool
62
+ return workPool
83
63
}
84
64
85
65
// AddJob posts the job on a worker queue
86
66
// Uses Hash underneath to choose worker to post the job to
87
67
func (wp * WorkerPool ) AddJob (job Job ) <- chan struct {} {
88
- wp .workerContext [ job . Hash () % wp . nWorkers ]. jobs <- job
68
+ wp .jobs <- job
89
69
return job .GetDoneCh ()
90
70
}
91
71
92
- // AddJobRoundRobin adds a job in round robin to the queues
93
- // Concurrent calls to AddJobRoundRobin are thread safe and fair
94
- // between each other
95
- func (wp * WorkerPool ) AddJobRoundRobin (jobs Job ) {
96
- added := atomic .AddUint64 (& wp .rRobinCounter , 1 )
97
- wp .workerContext [added % wp .nWorkers ].jobs <- jobs
98
- }
99
-
100
72
// GetWorkerNumber get number of queues/workers
101
73
func (wp * WorkerPool ) GetWorkerNumber () int {
102
74
return int (wp .nWorkers )
103
75
}
104
76
105
77
// Stop stops the workerpool
106
78
func (wp * WorkerPool ) Stop () {
107
- for _ , worker := range wp .workerContext {
108
- worker .stop <- struct {}{}
109
- }
79
+ close (wp .stop )
110
80
wp .wg .Wait ()
111
81
}
112
82
113
- func (workContext * worker ) work () {
114
- defer workContext .wg .Done ()
83
+ func (wp * WorkerPool ) work (id int ) {
84
+ wp .wg .Add (1 )
85
+ defer wp .wg .Done ()
86
+
87
+ log .Info ().Msgf ("Worker %d running" , id )
115
88
116
- log .Info ().Msgf ("Worker %d running" , workContext .id )
117
89
for {
118
90
select {
119
- case j := <- workContext .jobs :
91
+ case j := <- wp .jobs :
120
92
t := time .Now ()
121
- log .Debug ().Msgf ("work[%d]: Starting %v" , workContext . id , j .JobName ())
93
+ log .Debug ().Msgf ("work[%d]: Starting %v" , id , j .JobName ())
122
94
123
95
// Run current job
124
96
j .Run ()
125
97
126
- log .Debug ().Msgf ("work[%d][%s] : took %v" , workContext .id , j .JobName (), time .Since (t ))
127
- workContext .jobsProcessed ++
128
-
129
- case <- workContext .stop :
130
- log .Debug ().Msgf ("work[%d]: Stopped" , workContext .id )
98
+ log .Debug ().Msgf ("work[%d][%s] : took %v" , id , j .JobName (), time .Since (t ))
99
+ atomic .AddUint64 (& wp .jobsProcessed , 1 )
100
+ case <- wp .stop :
101
+ log .Debug ().Msgf ("work[%d]: Stopped" , id )
131
102
return
132
103
}
133
104
}
0 commit comments