@@ -17,19 +17,29 @@ import (
17
17
"k8s.io/apimachinery/pkg/runtime/schema"
18
18
"k8s.io/client-go/discovery"
19
19
"k8s.io/client-go/dynamic"
20
+ "k8s.io/client-go/tools/cache"
20
21
)
21
22
22
23
const discoveryInterval = 60 * time .Second
23
24
24
- // Observer watch api-server and manage kubernetes controllers
25
+ // ControllerFactory make controllers generation interchangeable
26
+ type ControllerFactory interface {
27
+ NewController (client cache.ListerWatcher , notifier event.Notifier ,
28
+ name string , config * config.KfConfig ) controller.Interface
29
+ }
30
+
31
+ type controllerCollection map [string ]controller.Interface
32
+
33
+ // Observer watch api-server and manage kubernetes controllers lifecyles
25
34
type Observer struct {
26
- stop chan struct {}
27
- done chan struct {}
28
- notif event.Notifier
29
- disc discovery.DiscoveryInterface
30
- cpool dynamic.ClientPool
31
- ctrls map [string ]* controller.Controller
32
- config * config.KfConfig
35
+ config * config.KfConfig
36
+ stopCh chan struct {}
37
+ doneCh chan struct {}
38
+ notifier event.Notifier
39
+ discovery discovery.DiscoveryInterface
40
+ cpool dynamic.ClientPool
41
+ ctrls controllerCollection
42
+ factory ControllerFactory
33
43
}
34
44
35
45
type gvk struct {
@@ -40,27 +50,28 @@ type gvk struct {
40
50
type resources map [string ]* gvk
41
51
42
52
// New returns a new observer, that will watch API resources and create controllers
43
- func New (config * config.KfConfig , notif event.Notifier ) * Observer {
53
+ func New (config * config.KfConfig , notif event.Notifier , factory ControllerFactory ) * Observer {
44
54
return & Observer {
45
- config : config ,
46
- notif : notif ,
47
- disc : discovery .NewDiscoveryClientForConfigOrDie (config .Client ),
48
- cpool : dynamic .NewDynamicClientPool (config .Client ),
49
- ctrls : make (map [string ]* controller.Controller ),
55
+ config : config ,
56
+ notifier : notif ,
57
+ discovery : discovery .NewDiscoveryClientForConfigOrDie (config .Client ),
58
+ cpool : dynamic .NewDynamicClientPool (config .Client ),
59
+ ctrls : make (controllerCollection ),
60
+ factory : factory ,
50
61
}
51
62
}
52
63
53
64
// Start starts the observer in a detached goroutine
54
65
func (c * Observer ) Start () * Observer {
55
66
c .config .Logger .Info ("Starting all kubernetes controllers" )
56
67
57
- c .stop = make (chan struct {})
58
- c .done = make (chan struct {})
68
+ c .stopCh = make (chan struct {})
69
+ c .doneCh = make (chan struct {})
59
70
60
71
go func () {
61
72
ticker := time .NewTicker (discoveryInterval )
62
73
defer ticker .Stop ()
63
- defer close (c .done )
74
+ defer close (c .doneCh )
64
75
65
76
for {
66
77
err := c .refresh ()
@@ -69,7 +80,7 @@ func (c *Observer) Start() *Observer {
69
80
}
70
81
71
82
select {
72
- case <- c .stop :
83
+ case <- c .stopCh :
73
84
return
74
85
case <- ticker .C :
75
86
}
@@ -83,17 +94,17 @@ func (c *Observer) Start() *Observer {
83
94
func (c * Observer ) Stop () {
84
95
c .config .Logger .Info ("Stopping all kubernetes controllers" )
85
96
86
- close (c .stop )
97
+ close (c .stopCh )
87
98
88
99
for _ , ct := range c .ctrls {
89
100
ct .Stop ()
90
101
}
91
102
92
- <- c .done
103
+ <- c .doneCh
93
104
}
94
105
95
106
func (c * Observer ) refresh () error {
96
- groups , err := c .disc .ServerResources ()
107
+ groups , err := c .discovery .ServerResources ()
97
108
if err != nil {
98
109
return fmt .Errorf ("failed to collect server resources: %v" , err )
99
110
}
@@ -114,7 +125,7 @@ func (c *Observer) refresh() error {
114
125
115
126
client := cl .Resource (res .apiResource .DeepCopy (), metav1 .NamespaceAll )
116
127
117
- c .ctrls [name ] = controller . New (client , c .notif , cname , c .config )
128
+ c .ctrls [name ] = c . factory . NewController (client , c .notifier , cname , c .config )
118
129
go c .ctrls [name ].Start ()
119
130
}
120
131
0 commit comments