Skip to content

Commit 62233b3

Browse files
committed
fix #2
Signed-off-by: Mikhail Scherba <[email protected]>
1 parent 153e695 commit 62233b3

File tree

2 files changed

+53
-35
lines changed

2 files changed

+53
-35
lines changed

pkg/hook/controller/kubernetes_bindings_controller.go

+41-23
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,10 @@ func (c *kubernetesBindingsController) EnableKubernetesBindings() ([]BindingExec
8585
if err != nil {
8686
return nil, fmt.Errorf("run monitor: %s", err)
8787
}
88-
c.l.Lock()
89-
c.BindingMonitorLinks[config.Monitor.Metadata.MonitorId] = &KubernetesBindingToMonitorLink{
88+
c.setBindingMonitorLinks(config.Monitor.Metadata.MonitorId, &KubernetesBindingToMonitorLink{
9089
MonitorId: config.Monitor.Metadata.MonitorId,
9190
BindingConfig: config,
92-
}
93-
c.l.Unlock()
91+
})
9492
// Start monitor's informers to fill the cache.
9593
c.kubeEventsManager.StartMonitor(config.Monitor.Metadata.MonitorId)
9694

@@ -106,9 +104,7 @@ func (c *kubernetesBindingsController) EnableKubernetesBindings() ([]BindingExec
106104

107105
func (c *kubernetesBindingsController) UpdateMonitor(monitorId string, kind, apiVersion string) error {
108106
// Find binding for monitorId
109-
c.l.RLock()
110-
link, ok := c.BindingMonitorLinks[monitorId]
111-
c.l.RUnlock()
107+
link, ok := c.getBindingMonitorLinksById(monitorId)
112108
if !ok {
113109
return nil
114110
}
@@ -158,12 +154,11 @@ func (c *kubernetesBindingsController) UpdateMonitor(monitorId string, kind, api
158154

159155
// UnlockEvents turns on eventCb for all monitors to emit events after Synchronization.
160156
func (c *kubernetesBindingsController) UnlockEvents() {
161-
c.l.RLock()
162-
for monitorID := range c.BindingMonitorLinks {
157+
c.iterateBindingMonitorLinks(func(monitorID string) bool {
163158
m := c.kubeEventsManager.GetMonitor(monitorID)
164159
m.EnableKubeEventCb()
165-
}
166-
c.l.RUnlock()
160+
return true
161+
})
167162
}
168163

169164
// UnlockEventsFor turns on eventCb for matched monitor to emit events after Synchronization.
@@ -179,30 +174,53 @@ func (c *kubernetesBindingsController) UnlockEventsFor(monitorID string) {
179174
// StopMonitors stops all monitors for the hook.
180175
// TODO handle error!
181176
func (c *kubernetesBindingsController) StopMonitors() {
182-
c.l.RLock()
183-
for monitorID := range c.BindingMonitorLinks {
177+
c.iterateBindingMonitorLinks(func(monitorID string) bool {
184178
_ = c.kubeEventsManager.StopMonitor(monitorID)
185-
}
186-
c.l.RUnlock()
179+
return true
180+
})
187181
}
188182

189183
func (c *kubernetesBindingsController) CanHandleEvent(kubeEvent kemtypes.KubeEvent) bool {
190-
c.l.RLock()
191-
defer c.l.RUnlock()
192-
for key := range c.BindingMonitorLinks {
193-
if key == kubeEvent.MonitorId {
184+
var canHandleEvent bool
185+
186+
c.iterateBindingMonitorLinks(func(monitorID string) bool {
187+
if monitorID == kubeEvent.MonitorId {
188+
canHandleEvent = true
194189
return true
195190
}
191+
return false
192+
})
193+
194+
return canHandleEvent
195+
}
196+
197+
func (c *kubernetesBindingsController) iterateBindingMonitorLinks(doFn func(monitorID string) bool) {
198+
c.l.RLock()
199+
for monitorID := range c.BindingMonitorLinks {
200+
if exit := doFn(monitorID); exit {
201+
break
202+
}
196203
}
197-
return false
204+
c.l.RUnlock()
205+
}
206+
207+
func (c *kubernetesBindingsController) getBindingMonitorLinksById(monitorId string) (*KubernetesBindingToMonitorLink, bool) {
208+
c.l.RLock()
209+
link, found := c.BindingMonitorLinks[monitorId]
210+
c.l.RUnlock()
211+
return link, found
212+
}
213+
214+
func (c *kubernetesBindingsController) setBindingMonitorLinks(monitorId string, link *KubernetesBindingToMonitorLink) {
215+
c.l.Lock()
216+
c.BindingMonitorLinks[monitorId] = link
217+
c.l.Unlock()
198218
}
199219

200220
// HandleEvent receives event from KubeEventManager and returns a BindingExecutionInfo
201221
// to help create a new task to run a hook.
202222
func (c *kubernetesBindingsController) HandleEvent(kubeEvent kemtypes.KubeEvent) BindingExecutionInfo {
203-
c.l.RLock()
204-
link, hasKey := c.BindingMonitorLinks[kubeEvent.MonitorId]
205-
c.l.RUnlock()
223+
link, hasKey := c.getBindingMonitorLinksById(kubeEvent.MonitorId)
206224
if !hasKey {
207225
log.Error("Possible bug!!! Unknown kube event: no such monitor id registered", slog.String("monitorID", kubeEvent.MonitorId))
208226
return BindingExecutionInfo{

pkg/task/queue/task_queue.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -413,18 +413,18 @@ func (q *TaskQueue) Start() {
413413

414414
if q.Handler == nil {
415415
log.Error("should set handler before start in queue", slog.String("name", q.Name))
416-
q.Status = "no handler set"
416+
q.SetStatus("no handler set")
417417
return
418418
}
419419

420420
go func() {
421-
q.Status = ""
421+
q.SetStatus("")
422422
var sleepDelay time.Duration
423423
for {
424424
q.debugf("queue %s: wait for task, delay %d", q.Name, sleepDelay)
425425
t := q.waitForTask(sleepDelay)
426426
if t == nil {
427-
q.Status = "stop"
427+
q.SetStatus("stop")
428428
log.Info("queue stopped", slog.String("name", q.Name))
429429
return
430430
}
@@ -435,14 +435,14 @@ func (q *TaskQueue) Start() {
435435

436436
// Now the task can be handled!
437437
var nextSleepDelay time.Duration
438-
q.Status = "run first task"
438+
q.SetStatus("run first task")
439439
taskRes := q.Handler(t)
440440

441441
// Check Done channel after long-running operation.
442442
select {
443443
case <-q.ctx.Done():
444444
log.Info("queue stopped after task handling", slog.String("name", q.Name))
445-
q.Status = "stop"
445+
q.SetStatus("stop")
446446
return
447447
default:
448448
}
@@ -452,7 +452,7 @@ func (q *TaskQueue) Start() {
452452
// Exponential backoff delay before retry.
453453
nextSleepDelay = q.ExponentialBackoffFn(t.GetFailureCount())
454454
t.IncrementFailureCount()
455-
q.Status = fmt.Sprintf("sleep after fail for %s", nextSleepDelay.String())
455+
q.SetStatus(fmt.Sprintf("sleep after fail for %s", nextSleepDelay.String()))
456456
case Success, Keep:
457457
// Insert new tasks right after the current task in reverse order.
458458
q.withLock(func() {
@@ -474,16 +474,16 @@ func (q *TaskQueue) Start() {
474474
q.addLast(newTask)
475475
}
476476
})
477-
q.Status = ""
477+
q.SetStatus("")
478478
case Repeat:
479479
// repeat a current task after a small delay
480480
nextSleepDelay = q.DelayOnRepeat
481-
q.Status = "repeat head task"
481+
q.SetStatus("repeat head task")
482482
}
483483

484484
if taskRes.DelayBeforeNextTask != 0 {
485485
nextSleepDelay = taskRes.DelayBeforeNextTask
486-
q.Status = fmt.Sprintf("sleep for %s", nextSleepDelay.String())
486+
q.SetStatus(fmt.Sprintf("sleep for %s", nextSleepDelay.String()))
487487
}
488488

489489
sleepDelay = nextSleepDelay
@@ -527,7 +527,7 @@ func (q *TaskQueue) waitForTask(sleepDelay time.Duration) task.Task {
527527
q.cancelDelay = false
528528
q.waitMu.Unlock()
529529

530-
origStatus := q.Status
530+
origStatus := q.GetStatus()
531531

532532
defer func() {
533533
checkTicker.Stop()
@@ -578,10 +578,10 @@ func (q *TaskQueue) waitForTask(sleepDelay time.Duration) task.Task {
578578
// Wait loop still in progress: update queue status.
579579
waitTime := time.Since(waitBegin).Truncate(time.Second)
580580
if sleepDelay == 0 {
581-
q.Status = fmt.Sprintf("waiting for task %s", waitTime.String())
581+
q.SetStatus(fmt.Sprintf("waiting for task %s", waitTime.String()))
582582
} else {
583583
delay := sleepDelay.Truncate(time.Second)
584-
q.Status = fmt.Sprintf("%s (%s left of %s delay)", origStatus, (delay - waitTime).String(), delay.String())
584+
q.SetStatus(fmt.Sprintf("%s (%s left of %s delay)", origStatus, (delay - waitTime).String(), delay.String()))
585585
}
586586
}
587587
}

0 commit comments

Comments
 (0)