Skip to content
This repository was archived by the owner on Apr 25, 2023. It is now read-only.

Commit 6225d40

Browse files
committed
introduce concurrent option to worker
1 parent 7544386 commit 6225d40

File tree

7 files changed

+47
-25
lines changed

7 files changed

+47
-25
lines changed

pkg/controller/federatedtypeconfig/controller.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func newController(config *util.ControllerConfig) (*Controller, error) {
9090
stopChannels: make(map[string]chan struct{}),
9191
}
9292

93-
c.worker = util.NewReconcileWorker("federatedtypeconfig", c.reconcile, util.WorkerTiming{})
93+
c.worker = util.NewReconcileWorker("federatedtypeconfig", c.reconcile, util.WorkerOptions{})
9494

9595
// Only watch the KubeFed namespace to ensure
9696
// restrictive authz can be applied to a namespaced

pkg/controller/schedulingmanager/controller.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func newSchedulingManager(config *util.ControllerConfig) (*SchedulingManager, er
9292
schedulers: util.NewSafeMap(),
9393
}
9494

95-
c.worker = util.NewReconcileWorker("schedulingmanager", c.reconcile, util.WorkerTiming{})
95+
c.worker = util.NewReconcileWorker("schedulingmanager", c.reconcile, util.WorkerOptions{})
9696

9797
var err error
9898
c.store, c.controller, err = util.NewGenericInformer(

pkg/controller/schedulingpreference/controller.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,10 @@ func newSchedulingPreferenceController(config *util.ControllerConfig, scheduling
105105
eventRecorder: recorder,
106106
}
107107

108-
s.worker = util.NewReconcileWorker(strings.ToLower(schedulingType.Kind), s.reconcile, util.WorkerTiming{
109-
ClusterSyncDelay: s.clusterAvailableDelay,
108+
s.worker = util.NewReconcileWorker(strings.ToLower(schedulingType.Kind), s.reconcile, util.WorkerOptions{
109+
WorkerTiming: util.WorkerTiming{
110+
ClusterSyncDelay: s.clusterAvailableDelay,
111+
},
110112
})
111113

112114
eventHandlers := schedulingtypes.SchedulerEventHandlers{

pkg/controller/status/controller.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,10 @@ func newKubeFedStatusController(controllerConfig *util.ControllerConfig, typeCon
123123
fedNamespace: controllerConfig.KubeFedNamespace,
124124
}
125125

126-
s.worker = util.NewReconcileWorker(strings.ToLower(statusAPIResource.Kind), s.reconcile, util.WorkerTiming{
127-
ClusterSyncDelay: s.clusterAvailableDelay,
126+
s.worker = util.NewReconcileWorker(strings.ToLower(statusAPIResource.Kind), s.reconcile, util.WorkerOptions{
127+
WorkerTiming: util.WorkerTiming{
128+
ClusterSyncDelay: s.clusterAvailableDelay,
129+
},
128130
})
129131

130132
// Build deliverer for triggering cluster reconciliations.

pkg/controller/sync/controller.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,10 @@ func newKubeFedSyncController(controllerConfig *util.ControllerConfig, typeConfi
132132
rawResourceStatusCollection: controllerConfig.RawResourceStatusCollection,
133133
}
134134

135-
s.worker = util.NewReconcileWorker(strings.ToLower(federatedTypeAPIResource.Kind), s.reconcile, util.WorkerTiming{
136-
ClusterSyncDelay: s.clusterAvailableDelay,
135+
s.worker = util.NewReconcileWorker(strings.ToLower(federatedTypeAPIResource.Kind), s.reconcile, util.WorkerOptions{
136+
WorkerTiming: util.WorkerTiming{
137+
ClusterSyncDelay: s.clusterAvailableDelay,
138+
},
137139
})
138140

139141
// Build deliverer for triggering cluster reconciliations.

pkg/controller/util/worker.go

+32-16
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ type ReconcileWorker interface {
3838
SetDelay(retryDelay, clusterSyncDelay time.Duration)
3939
}
4040

41+
type WorkerOptions struct {
42+
WorkerTiming
43+
44+
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
45+
MaxConcurrentReconciles int
46+
}
47+
4148
type WorkerTiming struct {
4249
Interval time.Duration
4350
RetryDelay time.Duration
@@ -53,6 +60,8 @@ type asyncWorker struct {
5360

5461
timing WorkerTiming
5562

63+
maxConcurrentReconciles int
64+
5665
// For triggering reconciliation of a single resource. This is
5766
// used when there is an add/update/delete operation on a resource
5867
// in either the API of the cluster hosting KubeFed or in the API
@@ -66,26 +75,30 @@ type asyncWorker struct {
6675
backoff *flowcontrol.Backoff
6776
}
6877

69-
func NewReconcileWorker(name string, reconcile ReconcileFunc, timing WorkerTiming) ReconcileWorker {
70-
if timing.Interval == 0 {
71-
timing.Interval = time.Second * 1
78+
func NewReconcileWorker(name string, reconcile ReconcileFunc, options WorkerOptions) ReconcileWorker {
79+
if options.Interval == 0 {
80+
options.Interval = time.Second * 1
7281
}
73-
if timing.RetryDelay == 0 {
74-
timing.RetryDelay = time.Second * 10
82+
if options.RetryDelay == 0 {
83+
options.RetryDelay = time.Second * 10
7584
}
76-
if timing.InitialBackoff == 0 {
77-
timing.InitialBackoff = time.Second * 5
85+
if options.InitialBackoff == 0 {
86+
options.InitialBackoff = time.Second * 5
7887
}
79-
if timing.MaxBackoff == 0 {
80-
timing.MaxBackoff = time.Minute
88+
if options.MaxBackoff == 0 {
89+
options.MaxBackoff = time.Minute
90+
}
91+
if options.MaxConcurrentReconciles == 0 {
92+
options.MaxConcurrentReconciles = 1
8193
}
8294
return &asyncWorker{
83-
name: name,
84-
reconcile: reconcile,
85-
timing: timing,
86-
deliverer: NewDelayingDeliverer(),
87-
queue: workqueue.NewNamed(name),
88-
backoff: flowcontrol.NewBackOff(timing.InitialBackoff, timing.MaxBackoff),
95+
name: name,
96+
reconcile: reconcile,
97+
timing: options.WorkerTiming,
98+
maxConcurrentReconciles: options.MaxConcurrentReconciles,
99+
deliverer: NewDelayingDeliverer(),
100+
queue: workqueue.NewNamed(name),
101+
backoff: flowcontrol.NewBackOff(options.InitialBackoff, options.MaxBackoff),
89102
}
90103
}
91104

@@ -122,7 +135,10 @@ func (w *asyncWorker) Run(stopChan <-chan struct{}) {
122135
w.queue.Add(*qualifiedName)
123136
}
124137
})
125-
go wait.Until(w.worker, w.timing.Interval, stopChan)
138+
139+
for i := 0; i < w.maxConcurrentReconciles; i++ {
140+
go wait.Until(w.worker, w.timing.Interval, stopChan)
141+
}
126142

127143
// Ensure all goroutines are cleaned up when the stop channel closes
128144
go func() {

pkg/controller/util/worker_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func TestDeduplicate(t *testing.T) {
6161
})
6262
return StatusAllOK
6363
},
64-
WorkerTiming{},
64+
WorkerOptions{},
6565
)
6666

6767
// run worker

0 commit comments

Comments
 (0)