Skip to content

Commit 881d036

Browse files
committed
Expect event.Notifier interface instead of concrete
Sames goes for discovery.DiscoveryInterface (rather than discovery.DiscoveryClient). Will make unit tests easier.
1 parent 1899edf commit 881d036

File tree

5 files changed

+30
-18
lines changed

5 files changed

+30
-18
lines changed

pkg/controller/controller.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ const maxProcessRetry = 6
3030
type Controller struct {
3131
stopCh chan struct{}
3232
doneCh chan struct{}
33-
notifier *event.Notifier
33+
notifier event.Notifier
3434
config *config.KfConfig
3535
name string
3636
queue workqueue.RateLimitingInterface
3737
informer cache.SharedIndexInformer
3838
}
3939

4040
// New return a kubernetes controller using the provided client
41-
func New(client cache.ListerWatcher, notifier *event.Notifier, name string, config *config.KfConfig) *Controller {
41+
func New(client cache.ListerWatcher, notifier event.Notifier, name string, config *config.KfConfig) *Controller {
4242

4343
selector := metav1.ListOptions{LabelSelector: config.Filter}
4444
lw := &cache.ListWatch{

pkg/event/event.go

+19-8
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,29 @@ type Notification struct {
2121
}
2222

2323
// Notifier mediates notifications between controllers and recorder
24-
type Notifier struct {
25-
C chan Notification
24+
type Notifier interface {
25+
Send(notif *Notification)
26+
ReadChan() <-chan Notification
2627
}
2728

28-
// New creates a new event.Notifier
29-
func New() *Notifier {
30-
return &Notifier{
31-
C: make(chan Notification),
29+
// Unbuffered implements Notifier
30+
type Unbuffered struct {
31+
c chan Notification
32+
}
33+
34+
// New creates an Unbuffered
35+
func New() *Unbuffered {
36+
return &Unbuffered{
37+
c: make(chan Notification),
3238
}
3339
}
3440

3541
// Send sends a notification
36-
func (n *Notifier) Send(notif *Notification) {
37-
n.C <- *notif
42+
func (n *Unbuffered) Send(notif *Notification) {
43+
n.c <- *notif
44+
}
45+
46+
// ReadChan returns a channel to read Notifications from
47+
func (n *Unbuffered) ReadChan() <-chan Notification {
48+
return n.c
3849
}

pkg/observer/observer.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ const discoveryInterval = 60 * time.Second
2525
type Observer struct {
2626
stop chan struct{}
2727
done chan struct{}
28-
notif *event.Notifier
29-
disc *discovery.DiscoveryClient
28+
notif event.Notifier
29+
disc discovery.DiscoveryInterface
3030
cpool dynamic.ClientPool
3131
ctrls map[string]*controller.Controller
3232
config *config.KfConfig
@@ -40,7 +40,7 @@ type gvk struct {
4040
type resources map[string]*gvk
4141

4242
// New returns a new observer, that will watch API resources and create controllers
43-
func New(config *config.KfConfig, notif *event.Notifier) *Observer {
43+
func New(config *config.KfConfig, notif event.Notifier) *Observer {
4444
return &Observer{
4545
config: config,
4646
notif: notif,

pkg/recorder/recorder.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ type activeFiles map[string]bool
2222
// Listener receive events from controllers and save them to disk as yaml files
2323
type Listener struct {
2424
config *config.KfConfig
25-
events *event.Notifier
25+
events event.Notifier
2626
actives activeFiles
2727
activesLock sync.RWMutex
2828
stopch chan struct{}
2929
donech chan struct{}
3030
}
3131

3232
// New creates a new Listener
33-
func New(config *config.KfConfig, events *event.Notifier) *Listener {
33+
func New(config *config.KfConfig, events event.Notifier) *Listener {
3434
return &Listener{
3535
config: config,
3636
events: events,
@@ -47,6 +47,7 @@ func (w *Listener) Start() *Listener {
4747
}
4848

4949
go func() {
50+
evCh := w.events.ReadChan()
5051
gcTick := time.NewTicker(w.config.ResyncIntv * 2)
5152
w.stopch = make(chan struct{})
5253
w.donech = make(chan struct{})
@@ -57,7 +58,7 @@ func (w *Listener) Start() *Listener {
5758
select {
5859
case <-w.stopch:
5960
return
60-
case ev := <-w.events.C:
61+
case ev := <-evCh:
6162
w.processNextEvent(&ev)
6263
case <-gcTick.C:
6364
w.deleteObsoleteFiles()

pkg/run/run.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func Run(config *config.KfConfig) {
2424

2525
evts := event.New()
2626
reco := recorder.New(config, evts).Start()
27-
ctrl := observer.New(config, evts).Start()
27+
obsv := observer.New(config, evts).Start()
2828

2929
http, err := health.New(config).Start()
3030
if err != nil {
@@ -36,7 +36,7 @@ func Run(config *config.KfConfig) {
3636
signal.Notify(sigterm, syscall.SIGINT)
3737
<-sigterm
3838

39-
ctrl.Stop()
39+
obsv.Stop()
4040
repo.Stop()
4141
reco.Stop()
4242
http.Stop()

0 commit comments

Comments
 (0)