@@ -7,6 +7,7 @@ package observer
7
7
import (
8
8
"fmt"
9
9
"strings"
10
+ "sync"
10
11
"time"
11
12
12
13
"github.com/bpineau/katafygio/config"
@@ -32,14 +33,15 @@ type controllerCollection map[string]controller.Interface
32
33
33
34
// Observer watch api-server and manage kubernetes controllers lifecyles
34
35
type Observer struct {
35
- config * config.KfConfig
36
- stopCh chan struct {}
37
- doneCh chan struct {}
38
- notifier event.Notifier
39
- discovery discovery.DiscoveryInterface
40
- cpool dynamic.ClientPool
41
- ctrls controllerCollection
42
- factory ControllerFactory
36
+ sync.RWMutex // protect ctrls
37
+ config * config.KfConfig
38
+ stopCh chan struct {}
39
+ doneCh chan struct {}
40
+ notifier event.Notifier
41
+ discovery discovery.DiscoveryInterface
42
+ cpool dynamic.ClientPool
43
+ ctrls controllerCollection
44
+ factory ControllerFactory
43
45
}
44
46
45
47
type gvk struct {
@@ -94,16 +96,21 @@ func (c *Observer) Start() *Observer {
94
96
func (c * Observer ) Stop () {
95
97
c .config .Logger .Info ("Stopping all kubernetes controllers" )
96
98
97
- close ( c .stopCh )
99
+ c .stopCh <- struct {}{}
98
100
101
+ c .RLock ()
99
102
for _ , ct := range c .ctrls {
100
103
ct .Stop ()
101
104
}
105
+ c .RUnlock ()
102
106
103
107
<- c .doneCh
104
108
}
105
109
106
110
func (c * Observer ) refresh () error {
111
+ c .Lock ()
112
+ defer c .Unlock ()
113
+
107
114
groups , err := c .discovery .ServerResources ()
108
115
if err != nil {
109
116
return fmt .Errorf ("failed to collect server resources: %v" , err )
0 commit comments