Skip to content

Commit 77de993

Browse files
committed
ListWatch is controllers responsibility
This should have moved when observer was separated from controllers.
1 parent 69009f5 commit 77de993

File tree

2 files changed

+42
-37
lines changed

2 files changed

+42
-37
lines changed

pkg/controller/controller.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@ import (
1212
"github.com/bpineau/katafygio/config"
1313
"github.com/bpineau/katafygio/pkg/event"
1414

15+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1516
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
17+
"k8s.io/apimachinery/pkg/runtime"
1618
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
1719
"k8s.io/apimachinery/pkg/util/wait"
20+
"k8s.io/apimachinery/pkg/watch"
1821
"k8s.io/client-go/tools/cache"
1922
"k8s.io/client-go/util/workqueue"
2023

@@ -34,9 +37,18 @@ type Controller struct {
3437
informer cache.SharedIndexInformer
3538
}
3639

37-
// New return an untyped, generic Kubernetes controller
38-
func New(lw cache.ListerWatcher, notifier *event.Notifier, name string, config *config.KfConfig) *Controller {
39-
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
40+
// New return a kubernetes controller using the provided client
41+
func New(client cache.ListerWatcher, notifier *event.Notifier, name string, config *config.KfConfig) *Controller {
42+
43+
selector := metav1.ListOptions{LabelSelector: config.Filter}
44+
lw := &cache.ListWatch{
45+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
46+
return client.List(selector)
47+
},
48+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
49+
return client.Watch(selector)
50+
},
51+
}
4052

4153
informer := cache.NewSharedIndexInformer(
4254
lw,
@@ -45,6 +57,8 @@ func New(lw cache.ListerWatcher, notifier *event.Notifier, name string, config *
4557
cache.Indexers{},
4658
)
4759

60+
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
61+
4862
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
4963
AddFunc: func(obj interface{}) {
5064
key, err := cache.MetaNamespaceKeyFunc(obj)

pkg/observer/observer.go

+25-34
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,9 @@ import (
1414
"github.com/bpineau/katafygio/pkg/event"
1515

1616
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17-
"k8s.io/apimachinery/pkg/runtime"
1817
"k8s.io/apimachinery/pkg/runtime/schema"
19-
"k8s.io/apimachinery/pkg/watch"
2018
"k8s.io/client-go/discovery"
2119
"k8s.io/client-go/dynamic"
22-
"k8s.io/client-go/tools/cache"
2320
)
2421

2522
const discoveryInterval = 60 * time.Second
@@ -36,17 +33,13 @@ type Observer struct {
3633
}
3734

3835
type gvk struct {
39-
group string
40-
version string
41-
kind string
42-
gv schema.GroupVersion
43-
ar metav1.APIResource
36+
groupVersion schema.GroupVersion
37+
apiResource metav1.APIResource
4438
}
4539

4640
type resources map[string]*gvk
4741

48-
// New returns a new observer, that will watch for api resource kinds
49-
// and create new controllers for each one.
42+
// New returns a new observer, that will watch API resources and create controllers
5043
func New(config *config.KfConfig, notif *event.Notifier) *Observer {
5144
return &Observer{
5245
config: config,
@@ -72,7 +65,7 @@ func (c *Observer) Start() *Observer {
7265
for {
7366
err := c.refresh()
7467
if err != nil {
75-
c.config.Logger.Errorf("Failed to refresh: %v", err)
68+
c.config.Logger.Errorf("Refresh failed: %v", err)
7669
}
7770

7871
select {
@@ -92,8 +85,8 @@ func (c *Observer) Stop() {
9285

9386
close(c.stop)
9487

95-
for _, c := range c.ctrls {
96-
c.Stop()
88+
for _, ct := range c.ctrls {
89+
ct.Stop()
9790
}
9891

9992
<-c.done
@@ -110,30 +103,27 @@ func (c *Observer) refresh() error {
110103
continue
111104
}
112105

113-
cl, err := c.cpool.ClientForGroupVersionKind(res.gv.WithKind(res.kind))
106+
kind := res.apiResource.Kind
107+
gk := res.groupVersion.WithKind(kind)
108+
cname := strings.ToLower(kind)
109+
110+
cl, err := c.cpool.ClientForGroupVersionKind(gk)
114111
if err != nil {
115-
return fmt.Errorf("failed to get a cpool for %s", name)
112+
return fmt.Errorf("failed to get a client for %s", name)
116113
}
117114

118-
client := cl.Resource(res.ar.DeepCopy(), metav1.NamespaceAll)
119-
120-
selector := metav1.ListOptions{LabelSelector: c.config.Filter}
121-
lw := &cache.ListWatch{
122-
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
123-
return client.List(selector)
124-
},
125-
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
126-
return client.Watch(selector)
127-
},
128-
}
115+
client := cl.Resource(res.apiResource.DeepCopy(), metav1.NamespaceAll)
129116

130-
c.ctrls[name] = controller.New(lw, c.notif, strings.ToLower(res.ar.Kind), c.config)
117+
c.ctrls[name] = controller.New(client, c.notif, cname, c.config)
131118
go c.ctrls[name].Start()
132119
}
133120

134121
return nil
135122
}
136123

124+
// The api-server may expose a resource under several API groups, for backward
125+
// compatibility. We'll want to ignore lower priorities "cohabitations":
126+
// cf. kubernetes/cmd/kube-apiserver/app/server.go
137127
var preferredVersions = map[string]string{
138128
"apps:deployment": "extensions:deployment",
139129
"apps:daemonset": "extensions:daemonset",
@@ -149,12 +139,12 @@ func (c *Observer) expandAndFilterAPIResources(groups []*metav1.APIResourceList)
149139
for _, group := range groups {
150140
gv, err := schema.ParseGroupVersion(group.GroupVersion)
151141
if err != nil {
152-
c.config.Logger.Errorf("api-server sent an unparsable group version: %v", err)
142+
c.config.Logger.Errorf("unparsable group version: %v", err)
153143
continue
154144
}
155145

156146
for _, ar := range group.APIResources {
157-
// remove subresources (like job/status or deployments/scale)
147+
// remove subresources (like job/status)
158148
if strings.ContainsRune(ar.Name, '/') {
159149
continue
160150
}
@@ -169,13 +159,13 @@ func (c *Observer) expandAndFilterAPIResources(groups []*metav1.APIResourceList)
169159
continue
170160
}
171161

172-
resource := &gvk{group: gv.Group, version: gv.Version, kind: ar.Kind, gv: gv, ar: ar}
173-
resources[strings.ToLower(gv.Group+":"+ar.Kind)] = resource
162+
resources[strings.ToLower(gv.Group+":"+ar.Kind)] = &gvk{
163+
groupVersion: gv,
164+
apiResource: ar,
165+
}
174166
}
175167
}
176168

177-
// remove lower priorities "cohabitations". cf. kubernetes/cmd/kube-apiserver/app/server.go
178-
// (the api-server may expose a resource under several api groups, for backward compat)
179169
for preferred, obsolete := range preferredVersions {
180170
if _, ok := resources[preferred]; ok {
181171
delete(resources, obsolete)
@@ -186,8 +176,9 @@ func (c *Observer) expandAndFilterAPIResources(groups []*metav1.APIResourceList)
186176
}
187177

188178
func isExcluded(excluded []string, name string) bool {
179+
lname := strings.ToLower(name)
189180
for _, ctl := range excluded {
190-
if strings.Compare(strings.ToLower(name), strings.ToLower(ctl)) == 0 {
181+
if strings.Compare(lname, strings.ToLower(ctl)) == 0 {
191182
return true
192183
}
193184
}

0 commit comments

Comments
 (0)