@@ -10,7 +10,6 @@ import (
10
10
"strings"
11
11
"time"
12
12
13
- "github.com/bpineau/katafygio/config"
14
13
"github.com/bpineau/katafygio/pkg/event"
15
14
16
15
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -37,25 +36,44 @@ type Interface interface {
37
36
Stop ()
38
37
}
39
38
39
+ type logger interface {
40
+ Infof (format string , args ... interface {})
41
+ Errorf (format string , args ... interface {})
42
+ }
43
+
40
44
// Factory generate controllers
41
- type Factory struct {}
45
+ type Factory struct {
46
+ logger logger
47
+ filter string
48
+ resyncIntv time.Duration
49
+ excluded []string
50
+ }
42
51
43
52
// Controller is a generic kubernetes controller
44
53
type Controller struct {
45
- name string
46
- stopCh chan struct {}
47
- doneCh chan struct {}
48
- syncCh chan struct {}
49
- notifier event.Notifier
50
- config * config.KfConfig
51
- queue workqueue.RateLimitingInterface
52
- informer cache.SharedIndexInformer
54
+ name string
55
+ stopCh chan struct {}
56
+ doneCh chan struct {}
57
+ syncCh chan struct {}
58
+ notifier event.Notifier
59
+ queue workqueue.RateLimitingInterface
60
+ informer cache.SharedIndexInformer
61
+ logger logger
62
+ resyncIntv time.Duration
63
+ excluded []string
53
64
}
54
65
55
66
// New return a kubernetes controller using the provided client
56
- func New (client cache.ListerWatcher , notifier event.Notifier , name string , config * config.KfConfig ) * Controller {
57
-
58
- selector := metav1.ListOptions {LabelSelector : config .Filter }
67
+ func New (client cache.ListerWatcher ,
68
+ notifier event.Notifier ,
69
+ log logger ,
70
+ name string ,
71
+ filter string ,
72
+ resync time.Duration ,
73
+ excluded []string ,
74
+ ) * Controller {
75
+
76
+ selector := metav1.ListOptions {LabelSelector : filter }
59
77
lw := & cache.ListWatch {
60
78
ListFunc : func (options metav1.ListOptions ) (runtime.Object , error ) {
61
79
return client .List (selector )
@@ -68,7 +86,7 @@ func New(client cache.ListerWatcher, notifier event.Notifier, name string, confi
68
86
informer := cache .NewSharedIndexInformer (
69
87
lw ,
70
88
& unstructured.Unstructured {},
71
- config . ResyncIntv ,
89
+ resync ,
72
90
cache.Indexers {},
73
91
)
74
92
@@ -96,20 +114,22 @@ func New(client cache.ListerWatcher, notifier event.Notifier, name string, confi
96
114
})
97
115
98
116
return & Controller {
99
- stopCh : make (chan struct {}),
100
- doneCh : make (chan struct {}),
101
- syncCh : make (chan struct {}, 1 ),
102
- notifier : notifier ,
103
- name : name ,
104
- config : config ,
105
- queue : queue ,
106
- informer : informer ,
117
+ stopCh : make (chan struct {}),
118
+ doneCh : make (chan struct {}),
119
+ syncCh : make (chan struct {}, 1 ),
120
+ notifier : notifier ,
121
+ name : name ,
122
+ queue : queue ,
123
+ informer : informer ,
124
+ logger : log ,
125
+ resyncIntv : resync ,
126
+ excluded : excluded ,
107
127
}
108
128
}
109
129
110
130
// Start launchs the controller in the background
111
131
func (c * Controller ) Start () {
112
- c .config . Logger .Infof ("Starting %s controller" , c .name )
132
+ c .logger .Infof ("Starting %s controller" , c .name )
113
133
defer utilruntime .HandleCrash ()
114
134
115
135
go c .informer .Run (c .stopCh )
@@ -126,7 +146,7 @@ func (c *Controller) Start() {
126
146
127
147
// Stop halts the controller
128
148
func (c * Controller ) Stop () {
129
- c .config . Logger .Infof ("Stopping %s controller" , c .name )
149
+ c .logger .Infof ("Stopping %s controller" , c .name )
130
150
<- c .syncCh
131
151
close (c .stopCh )
132
152
c .queue .ShutDown ()
@@ -148,7 +168,7 @@ func (c *Controller) processNextItem() bool {
148
168
defer c .queue .Done (key )
149
169
150
170
if strings .Compare (key .(string ), canaryKey ) == 0 {
151
- c .config . Logger .Infof ("Initial sync completed for %s controller" , c .name )
171
+ c .logger .Infof ("Initial sync completed for %s controller" , c .name )
152
172
c .syncCh <- struct {}{}
153
173
c .queue .Forget (key )
154
174
return true
@@ -160,11 +180,11 @@ func (c *Controller) processNextItem() bool {
160
180
// No error, reset the ratelimit counters
161
181
c .queue .Forget (key )
162
182
} else if c .queue .NumRequeues (key ) < maxProcessRetry {
163
- c .config . Logger .Errorf ("Error processing %s (will retry): %v" , key , err )
183
+ c .logger .Errorf ("Error processing %s (will retry): %v" , key , err )
164
184
c .queue .AddRateLimited (key )
165
185
} else {
166
186
// err != nil and too many retries
167
- c .config . Logger .Errorf ("Error processing %s (giving up): %v" , key , err )
187
+ c .logger .Errorf ("Error processing %s (giving up): %v" , key , err )
168
188
c .queue .Forget (key )
169
189
}
170
190
@@ -178,7 +198,7 @@ func (c *Controller) processItem(key string) error {
178
198
return fmt .Errorf ("error fetching %s from store: %v" , key , err )
179
199
}
180
200
181
- for _ , obj := range c .config . ExcludeObject {
201
+ for _ , obj := range c .excluded {
182
202
if strings .Compare (strings .ToLower (obj ), strings .ToLower (c .name + ":" + key )) == 0 {
183
203
return nil
184
204
}
@@ -200,8 +220,6 @@ func (c *Controller) processItem(key string) error {
200
220
delete (md , attr )
201
221
}
202
222
203
- c .config .Logger .Debugf ("Found %s/%s [%s]" , obj .GetAPIVersion (), obj .GetKind (), key )
204
-
205
223
yml , err := yaml .Marshal (obj )
206
224
if err != nil {
207
225
return fmt .Errorf ("failed to marshal %s: %v" , key , err )
@@ -215,7 +233,17 @@ func (c *Controller) enqueue(notif *event.Notification) {
215
233
c .notifier .Send (notif )
216
234
}
217
235
236
+ // NewFactory create a controller factory
237
+ func NewFactory (logger logger , filter string , resync int , excluded []string ) * Factory {
238
+ return & Factory {
239
+ logger : logger ,
240
+ filter : filter ,
241
+ resyncIntv : time .Duration (resync ) * time .Second ,
242
+ excluded : excluded ,
243
+ }
244
+ }
245
+
218
246
// NewController create a controller.Controller
219
- func (f * Factory ) NewController (client cache.ListerWatcher , notifier event.Notifier , name string , config * config. KfConfig ) Interface {
220
- return New (client , notifier , name , config )
247
+ func (f * Factory ) NewController (client cache.ListerWatcher , notifier event.Notifier , name string ) Interface {
248
+ return New (client , notifier , f . logger , name , f . filter , f . resyncIntv , f . excluded )
221
249
}
0 commit comments