Skip to content

Commit 375bd8e

Browse files
authored
[fix] : add k8s node cache and prefer providerID for node search (#353)
* add k8s node cache and prefer providerID for node search * fix failing unittests * add e2e test for updating instance label and making sure route still exists * update docs and add comments * fix capacity of map initialized * fix linting error
1 parent 0813a06 commit 375bd8e

File tree

6 files changed

+350
-12
lines changed

6 files changed

+350
-12
lines changed

cloud/linode/node_controller.go

+110-6
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@ import (
2424
)
2525

2626
const (
27-
informerResyncPeriod = 1 * time.Minute
28-
defaultMetadataTTL = 300 * time.Second
27+
informerResyncPeriod = 1 * time.Minute
28+
defaultMetadataTTL = 300 * time.Second
29+
defaultK8sNodeCacheTTL = 300 * time.Second
2930
)
3031

32+
var registeredK8sNodeCache *k8sNodeCache = newK8sNodeCache()
33+
3134
type nodeRequest struct {
3235
node *v1.Node
3336
timestamp time.Time
@@ -48,6 +51,104 @@ type nodeController struct {
4851
nodeLastAdded map[string]time.Time
4952
}
5053

54+
// k8sNodeCache stores node related info as registered in k8s
55+
type k8sNodeCache struct {
56+
sync.RWMutex
57+
nodes map[string]*v1.Node
58+
providerIDs map[string]string
59+
lastUpdate time.Time
60+
ttl time.Duration
61+
}
62+
63+
// updateCache updates the k8s node cache with the latest nodes from the k8s API server.
64+
func (c *k8sNodeCache) updateCache(kubeclient kubernetes.Interface) {
65+
c.Lock()
66+
defer c.Unlock()
67+
if time.Since(c.lastUpdate) < c.ttl {
68+
return
69+
}
70+
71+
nodeList, err := kubeclient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
72+
if err != nil {
73+
klog.Errorf("failed to list nodes, cannot create/update k8s node cache: %s", err)
74+
return
75+
}
76+
77+
nodes := make(map[string]*v1.Node, len(nodeList.Items))
78+
providerIDs := make(map[string]string, len(nodeList.Items))
79+
for _, node := range nodeList.Items {
80+
if node.Spec.ProviderID == "" {
81+
klog.Errorf("Empty providerID [%s] for node %s, skipping it", node.Spec.ProviderID, node.Name)
82+
continue
83+
}
84+
nodes[node.Name] = &node
85+
providerIDs[node.Spec.ProviderID] = node.Name
86+
}
87+
88+
c.nodes = nodes
89+
c.providerIDs = providerIDs
90+
c.lastUpdate = time.Now()
91+
}
92+
93+
// addNodeToCache stores the specified node in k8s node cache
94+
func (c *k8sNodeCache) addNodeToCache(node *v1.Node) {
95+
c.Lock()
96+
defer c.Unlock()
97+
if node.Spec.ProviderID == "" {
98+
klog.Errorf("Empty providerID [%s] for node %s, skipping it", node.Spec.ProviderID, node.Name)
99+
return
100+
}
101+
c.nodes[node.Name] = node
102+
c.providerIDs[node.Spec.ProviderID] = node.Name
103+
}
104+
105+
// getNodeLabel returns the k8s node label for the given provider ID or instance label.
106+
// If the provider ID or label is not found in the cache, it returns an empty string and false.
107+
func (c *k8sNodeCache) getNodeLabel(providerID string, instanceLabel string) (string, bool) {
108+
c.RLock()
109+
defer c.RUnlock()
110+
111+
// check if instance label matches with the registered k8s node
112+
if _, exists := c.nodes[instanceLabel]; exists {
113+
return instanceLabel, true
114+
}
115+
116+
// check if provider id matches with the registered k8s node
117+
if label, exists := c.providerIDs[providerID]; exists {
118+
return label, true
119+
}
120+
121+
return "", false
122+
}
123+
124+
// getProviderID returns linode specific providerID for given k8s node name
125+
func (c *k8sNodeCache) getProviderID(nodeName string) (string, bool) {
126+
c.RLock()
127+
defer c.RUnlock()
128+
129+
if node, exists := c.nodes[nodeName]; exists {
130+
return node.Spec.ProviderID, true
131+
}
132+
133+
return "", false
134+
}
135+
136+
// newK8sNodeCache returns new k8s node cache instance
137+
func newK8sNodeCache() *k8sNodeCache {
138+
timeout := defaultK8sNodeCacheTTL
139+
if raw, ok := os.LookupEnv("K8S_NODECACHE_TTL"); ok {
140+
if t, err := strconv.Atoi(raw); t > 0 && err == nil {
141+
timeout = time.Duration(t) * time.Second
142+
}
143+
}
144+
145+
return &k8sNodeCache{
146+
nodes: make(map[string]*v1.Node, 0),
147+
providerIDs: make(map[string]string, 0),
148+
ttl: timeout,
149+
}
150+
}
151+
51152
func newNodeController(kubeclient kubernetes.Interface, client client.Client, informer v1informers.NodeInformer, instanceCache *instances) *nodeController {
52153
timeout := defaultMetadataTTL
53154
if raw, ok := os.LookupEnv("LINODE_METADATA_TTL"); ok {
@@ -145,6 +246,7 @@ func (s *nodeController) processNext() bool {
145246
klog.Errorf("failed to add metadata for node (%s); will not retry: %s", request.node.Name, err)
146247
}
147248

249+
registeredK8sNodeCache.updateCache(s.kubeclient)
148250
return true
149251
}
150252

@@ -187,9 +289,7 @@ func (s *nodeController) handleNode(ctx context.Context, node *v1.Node) error {
187289

188290
expectedPrivateIP := ""
189291
// linode API response for linode will contain only one private ip
190-
// if any private ip is configured. If it changes in future or linode
191-
// supports other subnets with nodebalancer, this logic needs to be updated.
192-
// https://www.linode.com/docs/api/linode-instances/#linode-view
292+
// if any private ip is configured.
193293
for _, addr := range linode.IPv4 {
194294
if isPrivate(addr) {
195295
expectedPrivateIP = addr.String()
@@ -202,6 +302,7 @@ func (s *nodeController) handleNode(ctx context.Context, node *v1.Node) error {
202302
return nil
203303
}
204304

305+
var updatedNode *v1.Node
205306
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
206307
// Get a fresh copy of the node so the resource version is up-to-date
207308
nodeResult, err := s.kubeclient.CoreV1().Nodes().Get(ctx, node.Name, metav1.GetOptions{})
@@ -223,13 +324,16 @@ func (s *nodeController) handleNode(ctx context.Context, node *v1.Node) error {
223324
if nodeResult.Annotations[annotations.AnnLinodeNodePrivateIP] != expectedPrivateIP && expectedPrivateIP != "" {
224325
nodeResult.Annotations[annotations.AnnLinodeNodePrivateIP] = expectedPrivateIP
225326
}
226-
_, err = s.kubeclient.CoreV1().Nodes().Update(ctx, nodeResult, metav1.UpdateOptions{})
327+
updatedNode, err = s.kubeclient.CoreV1().Nodes().Update(ctx, nodeResult, metav1.UpdateOptions{})
227328
return err
228329
}); err != nil {
229330
klog.V(1).ErrorS(err, "Node update error")
230331
return err
231332
}
232333

334+
if updatedNode != nil {
335+
registeredK8sNodeCache.addNodeToCache(updatedNode)
336+
}
233337
s.SetLastMetadataUpdate(node.Name)
234338

235339
return nil

cloud/linode/node_controller_test.go

+68
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,34 @@ func TestNodeController_processNext(t *testing.T) {
125125
}
126126
})
127127

128+
t.Run("should return no error if node has providerID set", func(t *testing.T) {
129+
node2 := &v1.Node{
130+
ObjectMeta: metav1.ObjectMeta{
131+
Name: "test-node2",
132+
Labels: map[string]string{},
133+
Annotations: map[string]string{},
134+
},
135+
Spec: v1.NodeSpec{ProviderID: "linode://112"},
136+
}
137+
currInstances := controller.instances
138+
defer func() {
139+
controller.instances = currInstances
140+
}()
141+
controller.instances = newInstances(client)
142+
registeredK8sNodeCache.lastUpdate = time.Now().Add(-15 * time.Minute)
143+
controller.addNodeToQueue(node2)
144+
publicIP := net.ParseIP("172.234.31.123")
145+
privateIP := net.ParseIP("192.168.159.135")
146+
client.EXPECT().ListInstances(gomock.Any(), nil).Times(1).Return([]linodego.Instance{
147+
{ID: 112, Label: "test-node2", IPv4: []*net.IP{&publicIP, &privateIP}, HostUUID: "112"},
148+
}, nil)
149+
result := controller.processNext()
150+
assert.True(t, result, "processNext should return true")
151+
if queue.Len() != 0 {
152+
t.Errorf("expected queue to be empty, got %d items", queue.Len())
153+
}
154+
})
155+
128156
t.Run("should return no error if node in k8s doesn't exist", func(t *testing.T) {
129157
controller.addNodeToQueue(node)
130158
controller.kubeclient = fake.NewSimpleClientset()
@@ -192,6 +220,10 @@ func TestNodeController_handleNode(t *testing.T) {
192220
nodeCtrl := newNodeController(kubeClient, client, nil, instCache)
193221
assert.Equal(t, 30*time.Second, nodeCtrl.ttl, "expected ttl to be 30 seconds")
194222

223+
t.Setenv("K8S_NODECACHE_TTL", "60")
224+
currK8sNodeCache := newK8sNodeCache()
225+
assert.Equal(t, 60*time.Second, currK8sNodeCache.ttl, "expected ttl to be 60 seconds")
226+
195227
// Test: Successful metadata update
196228
publicIP := net.ParseIP("172.234.31.123")
197229
privateIP := net.ParseIP("192.168.159.135")
@@ -231,3 +263,39 @@ func TestNodeController_handleNode(t *testing.T) {
231263
err = nodeCtrl.handleNode(context.TODO(), node)
232264
assert.NoError(t, err, "expected no error during handleNode")
233265
}
266+
267+
func Test_k8sNodeCache_addNodeToCache(t *testing.T) {
268+
ctrl := gomock.NewController(t)
269+
defer ctrl.Finish()
270+
271+
// Add node with providerID set
272+
node := &v1.Node{
273+
ObjectMeta: metav1.ObjectMeta{
274+
Name: "test-node",
275+
Labels: map[string]string{},
276+
Annotations: map[string]string{},
277+
},
278+
Spec: v1.NodeSpec{ProviderID: "linode://123"},
279+
}
280+
281+
currK8sNodeCache := newK8sNodeCache()
282+
currK8sNodeCache.addNodeToCache(node)
283+
284+
if _, exists := currK8sNodeCache.nodes[node.Name]; !exists {
285+
t.Errorf("expected node to be added to cache")
286+
}
287+
288+
// Add node without providerID set
289+
node2 := &v1.Node{
290+
ObjectMeta: metav1.ObjectMeta{
291+
Name: "test-node2",
292+
Labels: map[string]string{},
293+
Annotations: map[string]string{},
294+
},
295+
}
296+
297+
currK8sNodeCache.addNodeToCache(node2)
298+
if _, exists := currK8sNodeCache.nodes[node2.Name]; exists {
299+
t.Errorf("expected node to not be added to cache")
300+
}
301+
}

cloud/linode/route_controller.go

+14
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ func (r *routes) getInstanceFromName(ctx context.Context, name string) (*linodeg
112112
},
113113
}
114114

115+
// fetch providerID from k8s node cache if it exists
116+
if id, ok := registeredK8sNodeCache.getProviderID(name); ok {
117+
node.Spec.ProviderID = id
118+
}
119+
115120
// fetch instance with specified node name
116121
instance, err := r.instances.lookupLinode(ctx, node)
117122
if err != nil {
@@ -235,6 +240,15 @@ func (r *routes) ListRoutes(ctx context.Context, clusterName string) ([]*cloudpr
235240

236241
var configuredRoutes []*cloudprovider.Route
237242
for _, instance := range instances {
243+
providerID := providerIDPrefix + strconv.Itoa(instance.ID)
244+
label, found := registeredK8sNodeCache.getNodeLabel(providerID, instance.Label)
245+
if !found {
246+
klog.V(4).Infof("Node %s not found in k8s node cache, skipping listing its routes", instance.Label)
247+
continue
248+
}
249+
// Update label to match with k8s registered label
250+
instance.Label = label
251+
238252
instanceRoutes, err := r.getInstanceRoutes(ctx, instance.ID)
239253
if err != nil {
240254
klog.Errorf("Failed finding routes for instance id %d. Error: %v", instance.ID, err)

0 commit comments

Comments
 (0)