Skip to content

Commit 2f60052

Browse files
committed
Dedicated notifier object to mediate events
This will probably help with unit tests.
1 parent 3eebcf1 commit 2f60052

File tree

5 files changed

+68
-48
lines changed

5 files changed

+68
-48
lines changed

pkg/controller/controller.go

+9-27
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/bpineau/katafygio/config"
13+
"github.com/bpineau/katafygio/pkg/event"
1314

1415
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1516
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -22,38 +23,19 @@ import (
2223

2324
const maxProcessRetry = 6
2425

25-
// Action represents the kind of object change we're notifying
26-
type Action int
27-
28-
const (
29-
// Delete is the object deletion Action
30-
Delete Action = iota
31-
32-
// Upsert is the update or create Action
33-
Upsert
34-
)
35-
36-
// Event conveys an object delete/upsert notification
37-
type Event struct {
38-
Action Action
39-
Key string
40-
Kind string
41-
Obj string
42-
}
43-
4426
// Controller is a generic kubernetes controller
4527
type Controller struct {
4628
stopCh chan struct{}
4729
doneCh chan struct{}
48-
evchan chan Event
49-
name string
30+
notifier *event.Notifier
5031
config *config.KfConfig
32+
name string
5133
queue workqueue.RateLimitingInterface
5234
informer cache.SharedIndexInformer
5335
}
5436

5537
// New return an untyped, generic Kubernetes controller
56-
func New(lw cache.ListerWatcher, evchan chan Event, name string, config *config.KfConfig) *Controller {
38+
func New(lw cache.ListerWatcher, notifier *event.Notifier, name string, config *config.KfConfig) *Controller {
5739
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
5840

5941
informer := cache.NewSharedIndexInformer(
@@ -87,7 +69,7 @@ func New(lw cache.ListerWatcher, evchan chan Event, name string, config *config.
8769
return &Controller{
8870
stopCh: make(chan struct{}),
8971
doneCh: make(chan struct{}),
90-
evchan: evchan,
72+
notifier: notifier,
9173
name: name,
9274
config: config,
9375
queue: queue,
@@ -158,7 +140,7 @@ func (c *Controller) processItem(key string) error {
158140

159141
if !exists {
160142
// deleted object
161-
c.enqueue(Event{Action: Delete, Key: key, Kind: c.name, Obj: ""})
143+
c.enqueue(&event.Notification{Action: event.Delete, Key: key, Kind: c.name, Object: ""})
162144
return nil
163145
}
164146

@@ -180,10 +162,10 @@ func (c *Controller) processItem(key string) error {
180162
return fmt.Errorf("failed to marshal %s: %v", key, err)
181163
}
182164

183-
c.enqueue(Event{Action: Upsert, Key: key, Kind: c.name, Obj: string(yml)})
165+
c.enqueue(&event.Notification{Action: event.Upsert, Key: key, Kind: c.name, Object: string(yml)})
184166
return nil
185167
}
186168

187-
func (c *Controller) enqueue(ev Event) {
188-
c.evchan <- ev
169+
func (c *Controller) enqueue(notif *event.Notification) {
170+
c.notifier.Send(notif)
189171
}

pkg/event/event.go

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Package event mediates notification between controllers and recorder
2+
package event
3+
4+
// Action represents the kind of object change we're notifying
5+
type Action int
6+
7+
const (
8+
// Delete is the object deletion Action
9+
Delete Action = iota
10+
11+
// Upsert is the update or create Action
12+
Upsert
13+
)
14+
15+
// Notification conveys an object delete/upsert notification
16+
type Notification struct {
17+
Action Action
18+
Key string
19+
Kind string
20+
Object string
21+
}
22+
23+
// Notifier mediates notifications between controllers and recorder
24+
type Notifier struct {
25+
C chan Notification
26+
}
27+
28+
// New creates a new event.Notifier
29+
func New() *Notifier {
30+
return &Notifier{
31+
C: make(chan Notification),
32+
}
33+
}
34+
35+
// Send sends a notification
36+
func (n *Notifier) Send(notif *Notification) {
37+
n.C <- *notif
38+
}

pkg/observer/observer.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/bpineau/katafygio/config"
1313
"github.com/bpineau/katafygio/pkg/controller"
14+
"github.com/bpineau/katafygio/pkg/event"
1415

1516
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1617
"k8s.io/apimachinery/pkg/runtime"
@@ -27,7 +28,7 @@ const discoveryInterval = 60 * time.Second
2728
type Observer struct {
2829
stop chan struct{}
2930
done chan struct{}
30-
evch chan controller.Event
31+
notif *event.Notifier
3132
disc *discovery.DiscoveryClient
3233
cpool dynamic.ClientPool
3334
ctrls map[string]*controller.Controller
@@ -46,10 +47,10 @@ type resources map[string]*gvk
4647

4748
// New returns a new observer, that will watch for api resource kinds
4849
// and create new controllers for each one.
49-
func New(config *config.KfConfig, evch chan controller.Event) *Observer {
50+
func New(config *config.KfConfig, notif *event.Notifier) *Observer {
5051
return &Observer{
5152
config: config,
52-
evch: evch,
53+
notif: notif,
5354
disc: discovery.NewDiscoveryClientForConfigOrDie(config.Client),
5455
cpool: dynamic.NewDynamicClientPool(config.Client),
5556
ctrls: make(map[string]*controller.Controller),
@@ -126,7 +127,7 @@ func (c *Observer) refresh() error {
126127
},
127128
}
128129

129-
c.ctrls[name] = controller.New(lw, c.evch, strings.ToLower(res.ar.Kind), c.config)
130+
c.ctrls[name] = controller.New(lw, c.notif, strings.ToLower(res.ar.Kind), c.config)
130131
go c.ctrls[name].Start()
131132
}
132133

pkg/recorder/recorder.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/bpineau/katafygio/config"
14-
"github.com/bpineau/katafygio/pkg/controller"
14+
"github.com/bpineau/katafygio/pkg/event"
1515
)
1616

1717
// activeFiles will contain a list of active (present in cluster) objets; we'll
@@ -22,18 +22,18 @@ type activeFiles map[string]bool
2222
// Listener receive events from controllers and save them to disk as yaml files
2323
type Listener struct {
2424
config *config.KfConfig
25-
evchan chan controller.Event
25+
events *event.Notifier
2626
actives activeFiles
2727
activesLock sync.RWMutex
2828
stopch chan struct{}
2929
donech chan struct{}
3030
}
3131

3232
// New creates a new Listener
33-
func New(config *config.KfConfig, evchan chan controller.Event) *Listener {
33+
func New(config *config.KfConfig, events *event.Notifier) *Listener {
3434
return &Listener{
3535
config: config,
36-
evchan: evchan,
36+
events: events,
3737
actives: activeFiles{},
3838
}
3939
}
@@ -57,8 +57,8 @@ func (w *Listener) Start() *Listener {
5757
select {
5858
case <-w.stopch:
5959
return
60-
case ev := <-w.evchan:
61-
w.processNextEvent(ev)
60+
case ev := <-w.events.C:
61+
w.processNextEvent(&ev)
6262
case <-gcTick.C:
6363
w.deleteObsoleteFiles()
6464
}
@@ -75,7 +75,7 @@ func (w *Listener) Stop() {
7575
<-w.donech
7676
}
7777

78-
func (w *Listener) processNextEvent(ev controller.Event) {
78+
func (w *Listener) processNextEvent(ev *event.Notification) {
7979
if w.shouldIgnore(ev) {
8080
return
8181
}
@@ -88,9 +88,9 @@ func (w *Listener) processNextEvent(ev controller.Event) {
8888
}
8989

9090
switch ev.Action {
91-
case controller.Upsert:
92-
err = w.save(path, ev.Obj)
93-
case controller.Delete:
91+
case event.Upsert:
92+
err = w.save(path, ev.Object)
93+
case event.Delete:
9494
err = w.remove(path)
9595
}
9696

@@ -99,7 +99,7 @@ func (w *Listener) processNextEvent(ev controller.Event) {
9999
}
100100
}
101101

102-
func (w *Listener) shouldIgnore(ev controller.Event) bool {
102+
func (w *Listener) shouldIgnore(ev *event.Notification) bool {
103103
for _, kind := range w.config.ExcludeKind {
104104
if strings.Compare(strings.ToLower(kind), ev.Kind) == 0 {
105105
return true
@@ -115,7 +115,7 @@ func (w *Listener) shouldIgnore(ev controller.Event) bool {
115115
return w.config.DryRun
116116
}
117117

118-
func getPath(root string, ev controller.Event) (string, error) {
118+
func getPath(root string, ev *event.Notification) (string, error) {
119119
filename := ev.Kind + "-" + filepath.Base(ev.Key) + ".yaml"
120120

121121
dir, err := filepath.Abs(filepath.Dir(root + "/" + ev.Key))

pkg/run/run.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"syscall"
99

1010
"github.com/bpineau/katafygio/config"
11-
"github.com/bpineau/katafygio/pkg/controller"
11+
"github.com/bpineau/katafygio/pkg/event"
1212
"github.com/bpineau/katafygio/pkg/health"
1313
"github.com/bpineau/katafygio/pkg/observer"
1414
"github.com/bpineau/katafygio/pkg/recorder"
@@ -22,10 +22,9 @@ func Run(config *config.KfConfig) {
2222
config.Logger.Fatalf("failed to start git repo handler: %v", err)
2323
}
2424

25-
evchan := make(chan controller.Event)
26-
27-
reco := recorder.New(config, evchan).Start()
28-
ctrl := observer.New(config, evchan).Start()
25+
evts := event.New()
26+
reco := recorder.New(config, evts).Start()
27+
ctrl := observer.New(config, evts).Start()
2928

3029
http, err := health.New(config).Start()
3130
if err != nil {

0 commit comments

Comments
 (0)