Skip to content

Commit 19c42c5

Browse files
committed
Corrected merge with main
1 parent a68bde1 commit 19c42c5

File tree

2 files changed

+50
-31
lines changed

2 files changed

+50
-31
lines changed

cloud/linode/node_controller.go

+31-14
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ const (
2828
defaultMetadataTTL = 300 * time.Second
2929
)
3030

31+
type nodeRequest struct {
32+
node *v1.Node
33+
timestamp time.Time
34+
}
35+
3136
type nodeController struct {
3237
sync.RWMutex
3338

@@ -39,7 +44,8 @@ type nodeController struct {
3944
metadataLastUpdate map[string]time.Time
4045
ttl time.Duration
4146

42-
queue workqueue.TypedDelayingInterface[any]
47+
queue workqueue.TypedDelayingInterface[nodeRequest]
48+
nodeLastAdded map[string]time.Time
4349
}
4450

4551
func newNodeController(kubeclient kubernetes.Interface, client client.Client, informer v1informers.NodeInformer, instanceCache *instances) *nodeController {
@@ -57,7 +63,8 @@ func newNodeController(kubeclient kubernetes.Interface, client client.Client, in
5763
informer: informer,
5864
ttl: timeout,
5965
metadataLastUpdate: make(map[string]time.Time),
60-
queue: workqueue.NewTypedDelayingQueueWithConfig[any](workqueue.TypedDelayingQueueConfig[any]{Name: "ccm_node"}),
66+
queue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[nodeRequest]{Name: "ccm_node"}),
67+
nodeLastAdded: make(map[string]time.Time),
6168
}
6269
}
6370

@@ -71,7 +78,7 @@ func (s *nodeController) Run(stopCh <-chan struct{}) {
7178
}
7279

7380
klog.Infof("NodeController will handle newly created node (%s) metadata", node.Name)
74-
s.queue.Add(node)
81+
s.addNodeToQueue(node)
7582
},
7683
UpdateFunc: func(oldObj, newObj interface{}) {
7784
node, ok := newObj.(*v1.Node)
@@ -80,7 +87,7 @@ func (s *nodeController) Run(stopCh <-chan struct{}) {
8087
}
8188

8289
klog.Infof("NodeController will handle newly updated node (%s) metadata", node.Name)
83-
s.queue.Add(node)
90+
s.addNodeToQueue(node)
8491
},
8592
},
8693
informerResyncPeriod,
@@ -92,6 +99,15 @@ func (s *nodeController) Run(stopCh <-chan struct{}) {
9299
s.informer.Informer().Run(stopCh)
93100
}
94101

102+
// addNodeToQueue adds a node to the queue for processing.
103+
func (s *nodeController) addNodeToQueue(node *v1.Node) {
104+
s.Lock()
105+
defer s.Unlock()
106+
currTime := time.Now()
107+
s.nodeLastAdded[node.Name] = currTime
108+
s.queue.Add(nodeRequest{node: node, timestamp: currTime})
109+
}
110+
95111
// worker runs a worker thread that dequeues new or modified nodes and processes
96112
// metadata (host UUID) on each of them.
97113
func (s *nodeController) worker() {
@@ -100,30 +116,31 @@ func (s *nodeController) worker() {
100116
}
101117

102118
func (s *nodeController) processNext() bool {
103-
key, quit := s.queue.Get()
119+
request, quit := s.queue.Get()
104120
if quit {
105121
return false
106122
}
107-
defer s.queue.Done(key)
123+
defer s.queue.Done(request)
108124

109-
node, ok := key.(*v1.Node)
110-
if !ok {
111-
klog.Errorf("expected dequeued key to be of type *v1.Node but got %T", node)
125+
s.RLock()
126+
latestTimestamp, exists := s.nodeLastAdded[request.node.Name]
127+
s.RUnlock()
128+
if !exists || request.timestamp.Before(latestTimestamp) {
129+
klog.V(3).InfoS("Skipping node metadata update as its not the most recent object", "node", klog.KObj(request.node))
112130
return true
113131
}
114-
115-
err := s.handleNode(context.TODO(), node)
132+
err := s.handleNode(context.TODO(), request.node)
116133
if err != nil {
117134
//nolint: errorlint //switching to errors.Is()/errors.As() causes errors with Code field
118135
switch deleteErr := err.(type) {
119136
case *linodego.Error:
120137
if deleteErr.Code >= http.StatusInternalServerError || deleteErr.Code == http.StatusTooManyRequests {
121-
klog.Errorf("failed to add metadata for node (%s); retrying in 1 minute: %s", node.Name, err)
122-
s.queue.AddAfter(node, retryInterval)
138+
klog.Errorf("failed to add metadata for node (%s); retrying in 1 minute: %s", request.node.Name, err)
139+
s.queue.AddAfter(request, retryInterval)
123140
}
124141

125142
default:
126-
klog.Errorf("failed to add metadata for node (%s); will not retry: %s", node.Name, err)
143+
klog.Errorf("failed to add metadata for node (%s); will not retry: %s", request.node.Name, err)
127144
}
128145
}
129146
return true

cloud/linode/node_controller_test.go

+19-17
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestNodeController_Run(t *testing.T) {
2828
client := mocks.NewMockClient(ctrl)
2929
kubeClient := fake.NewSimpleClientset()
3030
informer := informers.NewSharedInformerFactory(kubeClient, 0).Core().V1().Nodes()
31-
mockQueue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "test"})
31+
mockQueue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[nodeRequest]{Name: "test"})
3232

3333
nodeCtrl := newNodeController(kubeClient, client, informer, newInstances(client))
3434
nodeCtrl.queue = mockQueue
@@ -68,7 +68,7 @@ func TestNodeController_processNext(t *testing.T) {
6868
defer ctrl.Finish()
6969
client := mocks.NewMockClient(ctrl)
7070
kubeClient := fake.NewSimpleClientset()
71-
queue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "testQueue"})
71+
queue := workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[nodeRequest]{Name: "testQueue"})
7272
node := &v1.Node{
7373
ObjectMeta: metav1.ObjectMeta{
7474
Name: "test",
@@ -87,10 +87,11 @@ func TestNodeController_processNext(t *testing.T) {
8787
queue: queue,
8888
metadataLastUpdate: make(map[string]time.Time),
8989
ttl: defaultMetadataTTL,
90+
nodeLastAdded: make(map[string]time.Time),
9091
}
9192

9293
t.Run("should return no error on unknown errors", func(t *testing.T) {
93-
queue.Add(node)
94+
controller.addNodeToQueue(node)
9495
client.EXPECT().ListInstances(gomock.Any(), nil).Times(1).Return([]linodego.Instance{}, errors.New("lookup failed"))
9596
result := controller.processNext()
9697
assert.True(t, result, "processNext should return true")
@@ -99,22 +100,23 @@ func TestNodeController_processNext(t *testing.T) {
99100
}
100101
})
101102

102-
t.Run("should return no error if node exists", func(t *testing.T) {
103-
queue.Add(node)
104-
publicIP := net.ParseIP("172.234.31.123")
105-
privateIP := net.ParseIP("192.168.159.135")
106-
client.EXPECT().ListInstances(gomock.Any(), nil).Times(1).Return([]linodego.Instance{
107-
{ID: 111, Label: "test", IPv4: []*net.IP{&publicIP, &privateIP}, HostUUID: "111"},
108-
}, nil)
103+
t.Run("should return no error if timestamp for node being processed is older than the most recent request", func(t *testing.T) {
104+
controller.addNodeToQueue(node)
105+
controller.nodeLastAdded["test"] = time.Now().Add(controller.ttl)
109106
result := controller.processNext()
110107
assert.True(t, result, "processNext should return true")
111108
if queue.Len() != 0 {
112109
t.Errorf("expected queue to be empty, got %d items", queue.Len())
113110
}
114111
})
115112

116-
t.Run("should return no error if queued object is not of type Node", func(t *testing.T) {
117-
queue.Add("abc")
113+
t.Run("should return no error if node exists", func(t *testing.T) {
114+
controller.addNodeToQueue(node)
115+
publicIP := net.ParseIP("172.234.31.123")
116+
privateIP := net.ParseIP("192.168.159.135")
117+
client.EXPECT().ListInstances(gomock.Any(), nil).Times(1).Return([]linodego.Instance{
118+
{ID: 111, Label: "test", IPv4: []*net.IP{&publicIP, &privateIP}, HostUUID: "111"},
119+
}, nil)
118120
result := controller.processNext()
119121
assert.True(t, result, "processNext should return true")
120122
if queue.Len() != 0 {
@@ -123,7 +125,7 @@ func TestNodeController_processNext(t *testing.T) {
123125
})
124126

125127
t.Run("should return no error if node in k8s doesn't exist", func(t *testing.T) {
126-
queue.Add(node)
128+
controller.addNodeToQueue(node)
127129
controller.kubeclient = fake.NewSimpleClientset()
128130
defer func() { controller.kubeclient = kubeClient }()
129131
result := controller.processNext()
@@ -134,9 +136,9 @@ func TestNodeController_processNext(t *testing.T) {
134136
})
135137

136138
t.Run("should return error and requeue when it gets 429 from linode API", func(t *testing.T) {
137-
queue = workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "testQueue1"})
138-
queue.Add(node)
139+
queue = workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[nodeRequest]{Name: "testQueue1"})
139140
controller.queue = queue
141+
controller.addNodeToQueue(node)
140142
client := mocks.NewMockClient(ctrl)
141143
controller.instances = newInstances(client)
142144
retryInterval = 1 * time.Nanosecond
@@ -150,9 +152,9 @@ func TestNodeController_processNext(t *testing.T) {
150152
})
151153

152154
t.Run("should return error and requeue when it gets error >= 500 from linode API", func(t *testing.T) {
153-
queue = workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "testQueue2"})
154-
queue.Add(node)
155+
queue = workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[nodeRequest]{Name: "testQueue2"})
155156
controller.queue = queue
157+
controller.addNodeToQueue(node)
156158
client := mocks.NewMockClient(ctrl)
157159
controller.instances = newInstances(client)
158160
retryInterval = 1 * time.Nanosecond

0 commit comments

Comments
 (0)