Skip to content

Commit 4e4245c

Browse files
committed
adding context and seprate go routine to cleanup node resources
1 parent c65cb7f commit 4e4245c

35 files changed

+524
-291
lines changed

controllers/crds/cninode_controller.go

Lines changed: 29 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ package crds
1515

1616
import (
1717
"context"
18-
"fmt"
19-
"strings"
2018
"time"
2119

2220
"github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1"
@@ -27,8 +25,8 @@ import (
2725
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils"
2826
"github.com/go-logr/logr"
2927
"github.com/prometheus/client_golang/prometheus"
28+
"golang.org/x/sync/semaphore"
3029
v1 "k8s.io/api/core/v1"
31-
"k8s.io/apimachinery/pkg/api/errors"
3230
apierrors "k8s.io/apimachinery/pkg/api/errors"
3331
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3432
"k8s.io/apimachinery/pkg/runtime"
@@ -79,6 +77,7 @@ type CNINodeReconciler struct {
7977
clusterName string
8078
vpcId string
8179
finalizerManager k8s.FinalizerManager
80+
deletePool *semaphore.Weighted
8281
newResourceCleaner func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner
8382
}
8483

@@ -92,6 +91,7 @@ func NewCNINodeReconciler(
9291
clusterName string,
9392
vpcId string,
9493
finalizerManager k8s.FinalizerManager,
94+
maxConcurrentWorkers int,
9595
newResourceCleaner func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner,
9696
) *CNINodeReconciler {
9797
return &CNINodeReconciler{
@@ -104,6 +104,7 @@ func NewCNINodeReconciler(
104104
clusterName: clusterName,
105105
vpcId: vpcId,
106106
finalizerManager: finalizerManager,
107+
deletePool: semaphore.NewWeighted(int64(maxConcurrentWorkers)),
107108
newResourceCleaner: newResourceCleaner,
108109
}
109110
}
@@ -133,8 +134,8 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
133134

134135
if cniNode.GetDeletionTimestamp().IsZero() {
135136
cniNodeCopy := cniNode.DeepCopy()
136-
shouldPatch, err := r.ensureTagsAndLabels(cniNodeCopy, node, nodeFound)
137-
shouldPatch = r.ensureFinalizer(cniNodeCopy) || shouldPatch
137+
shouldPatch, err := r.ensureTagsAndLabels(cniNodeCopy, node)
138+
shouldPatch = controllerutil.AddFinalizer(cniNodeCopy, config.NodeTerminationFinalizer) || shouldPatch
138139

139140
if shouldPatch {
140141
r.log.Info("patching CNINode to add fields Tags, Labels and finalizer", "cninode", cniNode.Name)
@@ -147,24 +148,31 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
147148
}
148149
}
149150
return ctrl.Result{}, err
150-
151151
} else { // CNINode is marked for deletion
152152
if !nodeFound {
153153
// node is also deleted, proceed with running the cleanup routine and remove the finalizer
154-
155154
// run cleanup for Linux nodes only
156155
if val, ok := cniNode.ObjectMeta.Labels[config.NodeLabelOS]; ok && val == config.OSLinux {
157156
r.log.Info("running the finalizer routine on cniNode", "cniNode", cniNode.Name)
158157
// run cleanup when node id is present
159158
if nodeID, ok := cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey]; ok && nodeID != "" {
160-
if err := r.newResourceCleaner(nodeID, r.eC2Wrapper, r.vpcId, r.log).DeleteLeakedResources(); err != nil {
161-
r.log.Error(err, "failed to cleanup resources during node termination")
162-
ec2API.NodeTerminationENICleanupFailure.Inc()
159+
if !r.deletePool.TryAcquire(1) {
160+
r.log.Info("d, will requeue request")
161+
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
163162
}
163+
go func(nodeID string) {
164+
defer r.deletePool.Release(1)
165+
childCtx, cancel := context.WithTimeout(ctx, config.NodeTerminationTimeout)
166+
defer cancel()
167+
if err := r.newResourceCleaner(nodeID, r.eC2Wrapper, r.vpcId, r.log).DeleteLeakedResources(childCtx); err != nil {
168+
r.log.Error(err, "failed to cleanup resources during node termination")
169+
ec2API.NodeTerminationENICleanupFailure.Inc()
170+
}
171+
}(nodeID)
164172
}
165173
}
166174

167-
if err := r.removeFinalizers(ctx, cniNode, config.NodeTerminationFinalizer); err != nil {
175+
if err := r.removeFinalizer(ctx, cniNode, config.NodeTerminationFinalizer); err != nil {
168176
r.log.Error(err, "failed to remove finalizer on CNINode, will retry", "cniNode", cniNode.Name, "finalizer", config.NodeTerminationFinalizer)
169177
if apierrors.IsConflict(err) {
170178
return ctrl.Result{Requeue: true}, nil
@@ -188,7 +196,7 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
188196
Spec: cniNode.Spec,
189197
}
190198

191-
if err := r.removeFinalizers(ctx, cniNode, config.NodeTerminationFinalizer); err != nil {
199+
if err := r.removeFinalizer(ctx, cniNode, config.NodeTerminationFinalizer); err != nil {
192200
r.log.Error(err, "failed to remove finalizer on CNINode, will retry")
193201
return ctrl.Result{}, err
194202
}
@@ -233,7 +241,7 @@ func (r *CNINodeReconciler) waitTillCNINodeDeleted(nameSpacedCNINode types.Names
233241
oldCNINode := &v1alpha1.CNINode{}
234242

235243
return wait.PollUntilContextTimeout(context.TODO(), 500*time.Millisecond, time.Second*3, true, func(ctx context.Context) (bool, error) {
236-
if err := r.Client.Get(ctx, nameSpacedCNINode, oldCNINode); err != nil && errors.IsNotFound(err) {
244+
if err := r.Client.Get(ctx, nameSpacedCNINode, oldCNINode); err != nil && apierrors.IsNotFound(err) {
237245
return true, nil
238246
}
239247
return false, nil
@@ -248,17 +256,7 @@ func (r *CNINodeReconciler) createCNINodeFromObj(ctx context.Context, newCNINode
248256
})
249257
}
250258

251-
func (r *CNINodeReconciler) GetNodeID(node *v1.Node) (string, error) {
252-
if node.Spec.ProviderID == "" {
253-
return "", fmt.Errorf("provider ID is not set for node %s", node.Name)
254-
}
255-
if idx := strings.LastIndex(node.Spec.ProviderID, "/"); idx != -1 && idx < len(node.Spec.ProviderID)-1 {
256-
return node.Spec.ProviderID[idx+1:], nil
257-
}
258-
return "", fmt.Errorf("invalid provider ID format for node %s, with providerId", node.Spec.ProviderID)
259-
}
260-
261-
func (r *CNINodeReconciler) ensureTagsAndLabels(cniNode *v1alpha1.CNINode, node *v1.Node, nodeFound bool) (bool, error) {
259+
func (r *CNINodeReconciler) ensureTagsAndLabels(cniNode *v1alpha1.CNINode, node *v1.Node) (bool, error) {
262260
shouldPatch := false
263261
var err error
264262
if cniNode.Spec.Tags == nil {
@@ -269,11 +267,11 @@ func (r *CNINodeReconciler) ensureTagsAndLabels(cniNode *v1alpha1.CNINode, node
269267
cniNode.Spec.Tags[config.VPCCNIClusterNameKey] = r.clusterName
270268
shouldPatch = true
271269
}
272-
if nodeFound {
270+
if node != nil {
273271
var nodeID string
274-
nodeID, err = r.GetNodeID(node)
272+
nodeID, err = utils.GetNodeID(node)
275273

276-
if cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey] != nodeID {
274+
if nodeID != "" && cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey] != nodeID {
277275
cniNode.Spec.Tags[config.NetworkInterfaceNodeIDKey] = nodeID
278276
shouldPatch = true
279277
}
@@ -290,28 +288,12 @@ func (r *CNINodeReconciler) ensureTagsAndLabels(cniNode *v1alpha1.CNINode, node
290288
return shouldPatch, err
291289
}
292290

293-
func (r *CNINodeReconciler) ensureFinalizer(cniNode *v1alpha1.CNINode) bool {
294-
shouldPatch := false
295-
if !controllerutil.ContainsFinalizer(cniNode, config.NodeTerminationFinalizer) {
296-
r.log.Info("adding finalizer", "object", cniNode.GetObjectKind().GroupVersionKind().Kind, "name", cniNode.GetName(), "finalizer", config.NodeTerminationFinalizer)
297-
controllerutil.AddFinalizer(cniNode, config.NodeTerminationFinalizer)
298-
shouldPatch = true
299-
}
300-
return shouldPatch
301-
}
302-
303-
func (r *CNINodeReconciler) removeFinalizers(ctx context.Context, cniNode *v1alpha1.CNINode, finalizer string) error {
291+
func (r *CNINodeReconciler) removeFinalizer(ctx context.Context, cniNode *v1alpha1.CNINode, finalizer string) error {
304292
cniNodeCopy := cniNode.DeepCopy()
305-
needsUpdate := false
306293

307-
if controllerutil.ContainsFinalizer(cniNodeCopy, finalizer) {
294+
if controllerutil.RemoveFinalizer(cniNodeCopy, finalizer) {
308295
r.log.Info("removing finalizer for cninode", "name", cniNode.GetName(), "finalizer", finalizer)
309-
controllerutil.RemoveFinalizer(cniNodeCopy, finalizer)
310-
needsUpdate = true
311-
}
312-
313-
if !needsUpdate {
314-
return nil
296+
return r.Client.Patch(ctx, cniNodeCopy, client.MergeFromWithOptions(cniNode, client.MergeFromWithOptimisticLock{}))
315297
}
316-
return r.Client.Patch(ctx, cniNodeCopy, client.MergeFromWithOptions(cniNode, client.MergeFromWithOptimisticLock{}))
298+
return nil
317299
}

controllers/crds/cninode_controller_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/go-logr/logr"
1515
"github.com/golang/mock/gomock"
1616
"github.com/stretchr/testify/assert"
17+
"golang.org/x/sync/semaphore"
1718
corev1 "k8s.io/api/core/v1"
1819
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1920
"k8s.io/apimachinery/pkg/runtime"
@@ -61,6 +62,7 @@ func NewCNINodeMock(ctrl *gomock.Controller, mockObjects ...client.Object) *CNIN
6162
log: zap.New(),
6263
clusterName: mockClusterName,
6364
vpcId: "vpc-000000000000",
65+
deletePool: semaphore.NewWeighted(10),
6466
},
6567
}
6668
}
@@ -120,7 +122,7 @@ func TestCNINodeReconcile(t *testing.T) {
120122
f.mockCNINode.Reconciler.newResourceCleaner = func(nodeID string, eC2Wrapper ec2API.EC2Wrapper, vpcID string, log logr.Logger) cleanup.ResourceCleaner {
121123
return f.mockResourceCleaner
122124
}
123-
f.mockResourceCleaner.EXPECT().DeleteLeakedResources().Times(0)
125+
f.mockResourceCleaner.EXPECT().DeleteLeakedResources(gomock.Any()).Times(0)
124126
},
125127
asserts: func(res reconcile.Result, err error, cniNode *v1alpha1.CNINode) {
126128
assert.NoError(t, err)
@@ -152,7 +154,7 @@ func TestCNINodeReconcile(t *testing.T) {
152154
assert.Equal(t, "i-1234567890", nodeID)
153155
return f.mockResourceCleaner
154156
}
155-
f.mockResourceCleaner.EXPECT().DeleteLeakedResources().Times(1).Return(nil)
157+
f.mockResourceCleaner.EXPECT().DeleteLeakedResources(gomock.Any()).Times(1).Return(nil)
156158

157159
},
158160
asserts: func(res reconcile.Result, err error, cniNode *v1alpha1.CNINode) {
@@ -186,7 +188,6 @@ func TestCNINodeReconcile(t *testing.T) {
186188
assert.Contains(t, cniNode.Finalizers, config.NodeTerminationFinalizer)
187189
},
188190
},
189-
190191
}
191192
for _, tt := range tests {
192193
t.Run(tt.name, func(t *testing.T) {

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ require (
2727
github.com/prometheus/common v0.62.0
2828
github.com/stretchr/testify v1.10.0
2929
go.uber.org/zap v1.27.0
30+
golang.org/x/sync v0.13.0
3031
golang.org/x/time v0.11.0
3132
gomodules.xyz/jsonpatch/v2 v2.4.0
3233
k8s.io/api v0.33.0
@@ -50,7 +51,6 @@ require (
5051
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
5152
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
5253
github.com/x448/float16 v0.8.4 // indirect
53-
golang.org/x/sync v0.13.0 // indirect
5454
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
5555
sigs.k8s.io/randfill v1.0.0 // indirect
5656
)

main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,7 @@ func main() {
439439
clusterName,
440440
vpcID,
441441
finalizerManager,
442+
maxNodeConcurrentReconciles,
442443
cleanup.NewNodeResourceCleaner,
443444
).SetupWithManager(mgr, maxNodeConcurrentReconciles)); err != nil {
444445
setupLog.Error(err, "unable to create controller", "controller", "CNINode")

mocks/amazon-vcp-resource-controller-k8s/pkg/aws/ec2/api/cleanup/mock_resource_cleaner.go

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)