Skip to content

Commit bb6b487

Browse files
author
Spike Curtis
committed
Use buffered channel instead of Mutex
Signed-off-by: Spike Curtis <[email protected]>
1 parent e16464e commit bb6b487

File tree

1 file changed

+41
-62
lines changed

1 file changed

+41
-62
lines changed

pkg/controllers/node/node_controller.go

+41-62
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package node
1616

1717
import (
1818
"context"
19-
"sync"
2019
"time"
2120

2221
log "github.com/sirupsen/logrus"
@@ -35,6 +34,12 @@ import (
3534
"github.com/projectcalico/libcalico-go/lib/options"
3635
)
3736

37+
const (
38+
RateLimitCalicoList = "calico-list"
39+
RateLimitK8s = "k8s"
40+
RateLimitCalicoDelete = "calico-delete"
41+
)
42+
3843
// NodeController implements the Controller interface. It is responsible for monitoring
3944
// kubernetes nodes and responding to delete events by removing them from the Calico datastore.
4045
type NodeController struct {
@@ -44,17 +49,14 @@ type NodeController struct {
4449
k8sClientset *kubernetes.Clientset
4550
rl workqueue.RateLimiter
4651
schedule chan interface{}
47-
48-
// the two bools are protected by the Mutex.
49-
m sync.Mutex
50-
syncInProgress bool
51-
syncScheduled bool
5252
}
5353

5454
// NewNodeController Constructor for NodeController
5555
func NewNodeController(ctx context.Context, k8sClientset *kubernetes.Clientset, calicoClient client.Interface) controller.Controller {
56-
// channel used to kick the controller into scheduling a sync
57-
schedule := make(chan interface{})
56+
// channel used to kick the controller into scheduling a sync. It has length
57+
// 1 so that we coalesce multiple kicks while a sync is happening down to
58+
// just one additional sync.
59+
schedule := make(chan interface{}, 1)
5860

5961
// Create a Node watcher.
6062
listWatcher := cache.NewListWatchFromClient(k8sClientset.CoreV1().RESTClient(), "nodes", "", fields.Everything())
@@ -64,7 +66,7 @@ func NewNodeController(ctx context.Context, k8sClientset *kubernetes.Clientset,
6466
DeleteFunc: func(obj interface{}) {
6567
// Just kick controller to wake up and perform a sync. No need to bother what node it was
6668
// as we sync everything.
67-
schedule <- nil
69+
kick(schedule)
6870
},
6971
}, cache.Indexers{})
7072

@@ -100,6 +102,7 @@ func (c *NodeController) Run(threadiness int, reconcilerPeriod string, stopCh ch
100102
go c.informer.Run(stopCh)
101103
log.Debug("Waiting to sync with Kubernetes API (Nodes)")
102104
for !c.informer.HasSynced() {
105+
time.Sleep(100 * time.Millisecond)
103106
}
104107
log.Debug("Finished syncing with Kubernetes API (Nodes)")
105108

@@ -108,8 +111,9 @@ func (c *NodeController) Run(threadiness int, reconcilerPeriod string, stopCh ch
108111

109112
log.Info("Node controller is now running")
110113

111-
// Kick off a start of day sync.
112-
c.schedule <- nil
114+
// Kick off a start of day sync. Write non-blocking so that if a sync is
115+
// already scheduled, we don't schedule another.
116+
kick(c.schedule)
113117

114118
<-stopCh
115119
log.Info("Stopping Node controller")
@@ -122,53 +126,15 @@ func (c *NodeController) acceptScheduleRequests(stopCh <-chan struct{}) {
122126
// Wait until something wakes us up, or we are stopped
123127
select {
124128
case <-c.schedule:
125-
c.doSchedule(stopCh)
126-
case <-stopCh:
127-
return
128-
}
129-
}
130-
}
131-
132-
// doSchedule actually performs the scheduling of syncs. It is a separate method
133-
// so that we don't introduce locking into the acceptScheduleRequests method.
134-
func (c *NodeController) doSchedule(stopCh <-chan struct{}) {
135-
c.m.Lock()
136-
defer c.m.Unlock()
137-
c.syncScheduled = true
138-
if c.syncInProgress {
139-
return
140-
}
141-
c.syncInProgress = true
142-
go c.syncUntilDone(stopCh)
143-
}
144-
145-
// syncUntilDone kicks off the sync and handles re-synching if something schedules
146-
// a sync while one is in progress. This method assumes the syncInProgress
147-
// and syncScheduled flags are set when it is called.
148-
func (c *NodeController) syncUntilDone(stopCh <-chan struct{}) {
149-
for {
150-
// Maybe stop?
151-
select {
129+
err := c.syncDelete()
130+
if err != nil {
131+
// Reschedule the sync since we hit an error. Note that
132+
// syncDelete() does its own rate limiting, so it's fine to
133+
// reschedule immediately.
134+
kick(c.schedule)
135+
}
152136
case <-stopCh:
153137
return
154-
default:
155-
c.m.Lock()
156-
if c.syncScheduled {
157-
c.syncScheduled = false
158-
c.m.Unlock()
159-
err := c.syncDelete()
160-
if err != nil {
161-
// If we hit an error, reschedule another sync. SyncDelete
162-
// handles its own rate limiting.
163-
c.m.Lock()
164-
c.syncScheduled = true
165-
c.m.Unlock()
166-
}
167-
} else {
168-
c.syncInProgress = false
169-
c.m.Unlock()
170-
return
171-
}
172138
}
173139
}
174140
}
@@ -177,21 +143,21 @@ func (c *NodeController) syncUntilDone(stopCh <-chan struct{}) {
177143
// K8s, and deletes any Calico nodes which do not exist in K8s.
178144
func (c *NodeController) syncDelete() error {
179145
// Possibly rate limit calls to Calico
180-
time.Sleep(c.rl.When("calico-list"))
146+
time.Sleep(c.rl.When(RateLimitCalicoList))
181147
cNodes, err := c.calicoClient.Nodes().List(c.ctx, options.ListOptions{})
182148
if err != nil {
183149
log.WithError(err).Errorf("Error listing Calico nodes", err)
184150
return err
185151
}
186-
c.rl.Forget("calico")
152+
c.rl.Forget(RateLimitCalicoList)
187153

188-
time.Sleep(c.rl.When("k8s"))
154+
time.Sleep(c.rl.When(RateLimitK8s))
189155
kNodes, err := c.k8sClientset.CoreV1().Nodes().List(meta_v1.ListOptions{})
190156
if err != nil {
191157
log.WithError(err).Errorf("Error listing K8s nodes", err)
192158
return err
193159
}
194-
c.rl.Forget("k8s")
160+
c.rl.Forget(RateLimitK8s)
195161
kNodeIdx := make(map[string]bool)
196162
for _, node := range kNodes.Items {
197163
kNodeIdx[node.Name] = true
@@ -201,15 +167,28 @@ func (c *NodeController) syncDelete() error {
201167
k8sNodeName := getK8sNodeName(node)
202168
if k8sNodeName != "" && !kNodeIdx[k8sNodeName] {
203169
// No matching Kubernetes node with that name
204-
time.Sleep(c.rl.When("calico-delete"))
170+
time.Sleep(c.rl.When(RateLimitCalicoDelete))
205171
_, err := c.calicoClient.Nodes().Delete(c.ctx, node.Name, options.DeleteOptions{})
206172
if _, doesNotExist := err.(errors.ErrorResourceDoesNotExist); err != nil && !doesNotExist {
207173
// We hit an error other than "does not exist".
208174
log.WithError(err).Errorf("Error deleting Calico node: %v", node.Name, err)
209175
return err
210176
}
211-
c.rl.Forget("calico-delete")
177+
c.rl.Forget(RateLimitCalicoDelete)
212178
}
213179
}
214180
return nil
215181
}
182+
183+
// kick puts an item on the channel in non-blocking write. This means if there
184+
// is already something pending, it has no effect. This allows us to coalesce
185+
// multiple requests into a single pending request.
186+
func kick(c chan<- interface{}) {
187+
select {
188+
case c <- nil:
189+
// pass
190+
default:
191+
// pass
192+
}
193+
194+
}

0 commit comments

Comments
 (0)