Skip to content

Commit ab4a567

Browse files
committed
Clean controllers shutdown
We didn't wait for the controllers shutdown before the program exited, let's handle this properly. Also, fix a few comments and log healthcheck start and stop.
1 parent 2f84cb2 commit ab4a567

File tree

4 files changed

+41
-23
lines changed

4 files changed

+41
-23
lines changed

pkg/controller/controller.go

+25-8
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ type Event struct {
3838

3939
// Controller is a generic kubernetes controller
4040
type Controller struct {
41+
stopCh chan struct{}
42+
doneCh chan struct{}
4143
evchan chan Event
4244
name string
4345
config *config.KdnConfig
@@ -77,27 +79,42 @@ func NewController(lw cache.ListerWatcher, evchan chan Event, name string, confi
7779
},
7880
})
7981

80-
return &Controller{evchan, name, config, queue, informer}
82+
return &Controller{
83+
stopCh: make(chan struct{}),
84+
doneCh: make(chan struct{}),
85+
evchan: evchan,
86+
name: name,
87+
config: config,
88+
queue: queue,
89+
informer: informer,
90+
}
8191
}
8292

83-
// Run starts the controller in the foreground
84-
func (c *Controller) Run(stopCh <-chan struct{}) {
93+
// Start launchs the controller in the background
94+
func (c *Controller) Start() {
8595
c.config.Logger.Infof("Starting %s controller", c.name)
8696
defer utilruntime.HandleCrash()
87-
defer c.queue.ShutDown()
8897

89-
go c.informer.Run(stopCh)
98+
go c.informer.Run(c.stopCh)
9099

91-
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
100+
if !cache.WaitForCacheSync(c.stopCh, c.informer.HasSynced) {
92101
utilruntime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
93102
return
94103
}
95104

96-
wait.Until(c.runWorker, time.Second, stopCh)
97-
// XXX needs a sync.wg to wait for that
105+
go wait.Until(c.runWorker, time.Second, c.stopCh)
106+
}
107+
108+
// Stop halts the controller
109+
func (c *Controller) Stop() {
110+
close(c.stopCh)
111+
c.queue.ShutDown()
112+
<-c.doneCh
113+
c.config.Logger.Infof("Stopping %s controller", c.name)
98114
}
99115

100116
func (c *Controller) runWorker() {
117+
defer close(c.doneCh)
101118
for c.processNextItem() {
102119
// continue looping
103120
}

pkg/controller/observer.go

+8-10
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type Observer struct {
2525
evch chan Event
2626
disc *discovery.DiscoveryClient
2727
cpool dynamic.ClientPool
28-
ctrls map[string]chan<- struct{}
28+
ctrls map[string]*Controller
2929
config *config.KdnConfig
3030
}
3131

@@ -46,7 +46,7 @@ func NewObserver(config *config.KdnConfig, evch chan Event) *Observer {
4646
evch: evch,
4747
disc: discovery.NewDiscoveryClientForConfigOrDie(config.Client),
4848
cpool: dynamic.NewDynamicClientPool(config.Client),
49-
ctrls: make(map[string]chan<- struct{}),
49+
ctrls: make(map[string]*Controller),
5050
}
5151
}
5252

@@ -85,8 +85,8 @@ func (c *Observer) Stop() {
8585

8686
close(c.stop)
8787

88-
for _, ch := range c.ctrls {
89-
close(ch)
88+
for _, c := range c.ctrls {
89+
c.Stop()
9090
}
9191

9292
<-c.done
@@ -120,10 +120,8 @@ func (c *Observer) refresh() error {
120120
},
121121
}
122122

123-
stop := make(chan struct{})
124-
c.ctrls[name] = stop
125-
name := strings.ToLower(res.ar.Kind)
126-
go NewController(lw, c.evch, name, c.config).Run(stop)
123+
c.ctrls[name] = NewController(lw, c.evch, strings.ToLower(res.ar.Kind), c.config)
124+
go c.ctrls[name].Start()
127125
}
128126

129127
return nil
@@ -160,8 +158,8 @@ func (c *Observer) expandAndFilterAPIResources(groups []*metav1.APIResourceList)
160158
continue
161159
}
162160

163-
g := &gvk{group: gv.Group, version: gv.Version, kind: ar.Kind, gv: gv, ar: ar}
164-
resources[strings.ToLower(gv.Group+":"+ar.Kind)] = g
161+
resource := &gvk{group: gv.Group, version: gv.Version, kind: ar.Kind, gv: gv, ar: ar}
162+
resources[strings.ToLower(gv.Group+":"+ar.Kind)] = resource
165163
}
166164
}
167165

pkg/health/health.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ func (h *Listener) Start() (*Listener, error) {
3838
return h, nil
3939
}
4040

41+
h.config.Logger.Info("Starting http healtcheck handler")
42+
4143
h.srv = &http.Server{Addr: fmt.Sprintf(":%d", h.config.HealthPort)}
4244

4345
http.HandleFunc("/health", h.healthCheckReply)
@@ -52,11 +54,12 @@ func (h *Listener) Start() (*Listener, error) {
5254

5355
// Stop halts the http health check handler
5456
func (h *Listener) Stop() {
55-
h.config.Logger.Info("Stopping http healtcheck handler")
5657
if h.srv == nil {
5758
return
5859
}
5960

61+
h.config.Logger.Info("Stopping http healtcheck handler")
62+
6063
err := h.srv.Shutdown(context.TODO())
6164
if err != nil {
6265
h.config.Logger.Warningf("failed to stop http healtcheck handler: %v", err)

pkg/run/run.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// Package run implements the main katafygio's loop, by
2-
// launching the healthcheck service and all known controllers.
2+
// the services and controllers.
33
package run
44

55
import (
@@ -14,7 +14,7 @@ import (
1414
"github.com/bpineau/katafygio/pkg/store/git"
1515
)
1616

17-
// Run launchs the effective controllers goroutines
17+
// Run launchs the services
1818
func Run(config *config.KdnConfig) {
1919
repo, err := git.New(config).Start()
2020
if err != nil {
@@ -23,7 +23,7 @@ func Run(config *config.KdnConfig) {
2323

2424
evchan := make(chan controller.Event)
2525

26-
rec := recorder.New(config, evchan).Start()
26+
reco := recorder.New(config, evchan).Start()
2727
ctrl := controller.NewObserver(config, evchan).Start()
2828

2929
http, err := health.New(config).Start()
@@ -38,6 +38,6 @@ func Run(config *config.KdnConfig) {
3838

3939
ctrl.Stop()
4040
repo.Stop()
41-
rec.Stop()
41+
reco.Stop()
4242
http.Stop()
4343
}

0 commit comments

Comments
 (0)