@@ -25,8 +25,9 @@ import (
25
25
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/utils"
26
26
"github.com/go-logr/logr"
27
27
"github.com/prometheus/client_golang/prometheus"
28
+ "golang.org/x/sync/semaphore"
28
29
v1 "k8s.io/api/core/v1"
29
- "k8s.io/apimachinery/pkg/api/errors"
30
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
30
31
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31
32
"k8s.io/apimachinery/pkg/runtime"
32
33
"k8s.io/apimachinery/pkg/types"
@@ -35,6 +36,7 @@ import (
35
36
ctrl "sigs.k8s.io/controller-runtime"
36
37
"sigs.k8s.io/controller-runtime/pkg/client"
37
38
"sigs.k8s.io/controller-runtime/pkg/controller"
39
+ "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
38
40
"sigs.k8s.io/controller-runtime/pkg/metrics"
39
41
)
40
42
@@ -75,7 +77,8 @@ type CNINodeReconciler struct {
75
77
clusterName string
76
78
vpcId string
77
79
finalizerManager k8s.FinalizerManager
78
- newResourceCleaner func (nodeID string , eC2Wrapper ec2API.EC2Wrapper , vpcID string ) cleanup.ResourceCleaner
80
+ deletePool * semaphore.Weighted
81
+ newResourceCleaner func (nodeID string , eC2Wrapper ec2API.EC2Wrapper , vpcID string , log logr.Logger ) cleanup.ResourceCleaner
79
82
}
80
83
81
84
func NewCNINodeReconciler (
@@ -88,7 +91,8 @@ func NewCNINodeReconciler(
88
91
clusterName string ,
89
92
vpcId string ,
90
93
finalizerManager k8s.FinalizerManager ,
91
- newResourceCleaner func (nodeID string , eC2Wrapper ec2API.EC2Wrapper , vpcID string ) cleanup.ResourceCleaner ,
94
+ maxConcurrentWorkers int ,
95
+ newResourceCleaner func (nodeID string , eC2Wrapper ec2API.EC2Wrapper , vpcID string , log logr.Logger ) cleanup.ResourceCleaner ,
92
96
) * CNINodeReconciler {
93
97
return & CNINodeReconciler {
94
98
Client : client ,
@@ -100,6 +104,7 @@ func NewCNINodeReconciler(
100
104
clusterName : clusterName ,
101
105
vpcId : vpcId ,
102
106
finalizerManager : finalizerManager ,
107
+ deletePool : semaphore .NewWeighted (int64 (maxConcurrentWorkers )),
103
108
newResourceCleaner : newResourceCleaner ,
104
109
}
105
110
}
@@ -118,7 +123,7 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
118
123
nodeFound := true
119
124
node := & v1.Node {}
120
125
if err := r .Client .Get (ctx , req .NamespacedName , node ); err != nil {
121
- if errors .IsNotFound (err ) {
126
+ if apierrors .IsNotFound (err ) {
122
127
nodeFound = false
123
128
} else {
124
129
r .log .Error (err , "failed to get the node object in CNINode reconciliation, will retry" )
@@ -128,66 +133,50 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
128
133
}
129
134
130
135
if cniNode .GetDeletionTimestamp ().IsZero () {
131
- shouldPatch := false
132
136
cniNodeCopy := cniNode .DeepCopy ()
133
- // Add cluster name tag if it does not exist
134
- val , ok := cniNode .Spec .Tags [config .VPCCNIClusterNameKey ]
135
- if ! ok || val != r .clusterName {
136
- if len (cniNodeCopy .Spec .Tags ) != 0 {
137
- cniNodeCopy .Spec .Tags [config .VPCCNIClusterNameKey ] = r .clusterName
138
- } else {
139
- cniNodeCopy .Spec .Tags = map [string ]string {
140
- config .VPCCNIClusterNameKey : r .clusterName ,
141
- }
142
- }
143
- shouldPatch = true
144
- }
145
- // if node exists, get & add OS label if it does not exist on CNINode
146
- if nodeFound {
147
- nodeLabelOS := node .ObjectMeta .Labels [config .NodeLabelOS ]
148
- val , ok = cniNode .ObjectMeta .Labels [config .NodeLabelOS ]
149
- if ! ok || val != nodeLabelOS {
150
- if len (cniNodeCopy .ObjectMeta .Labels ) != 0 {
151
- cniNodeCopy .ObjectMeta .Labels [config .NodeLabelOS ] = nodeLabelOS
152
- } else {
153
- cniNodeCopy .ObjectMeta .Labels = map [string ]string {
154
- config .NodeLabelOS : nodeLabelOS ,
155
- }
156
- }
157
- shouldPatch = true
158
- }
159
- }
137
+ shouldPatch , err := r .ensureTagsAndLabels (cniNodeCopy , node )
138
+ shouldPatch = controllerutil .AddFinalizer (cniNodeCopy , config .NodeTerminationFinalizer ) || shouldPatch
160
139
161
140
if shouldPatch {
162
- r .log .Info ("patching CNINode to add required fields Tags and Labels " , "cninode" , cniNode .Name )
163
- return ctrl. Result {}, r .Client .Patch (ctx , cniNodeCopy , client .MergeFromWithOptions (cniNode , client.MergeFromWithOptimisticLock {}))
164
- }
165
-
166
- // Add finalizer if it does not exist
167
- if err := r . finalizerManager . AddFinalizers ( ctx , cniNode , config . NodeTerminationFinalizer ); err != nil {
168
- r . log . Error ( err , "failed to add finalizer on CNINode, will retry" , "cniNode" , cniNode . Name , "finalizer" , config . NodeTerminationFinalizer )
169
- return ctrl. Result {}, err
141
+ r .log .Info ("patching CNINode to add fields Tags, Labels and finalizer " , "cninode" , cniNode .Name )
142
+ if err := r .Client .Patch (ctx , cniNodeCopy , client .MergeFromWithOptions (cniNode , client.MergeFromWithOptimisticLock {})); err != nil {
143
+ if apierrors . IsConflict ( err ) {
144
+ r . log . Info ( "failed to update cninode" , "cninode" , cniNode . Name , "error" , err )
145
+ return ctrl. Result { Requeue : true }, nil
146
+ }
147
+ return ctrl. Result {}, err
148
+ }
170
149
}
171
- return ctrl.Result {}, nil
172
-
150
+ return ctrl.Result {}, err
173
151
} else { // CNINode is marked for deletion
174
152
if ! nodeFound {
175
153
// node is also deleted, proceed with running the cleanup routine and remove the finalizer
176
-
177
154
// run cleanup for Linux nodes only
178
155
if val , ok := cniNode .ObjectMeta .Labels [config .NodeLabelOS ]; ok && val == config .OSLinux {
179
156
r .log .Info ("running the finalizer routine on cniNode" , "cniNode" , cniNode .Name )
180
157
// run cleanup when node id is present
181
158
if nodeID , ok := cniNode .Spec .Tags [config .NetworkInterfaceNodeIDKey ]; ok && nodeID != "" {
182
- if err := r . newResourceCleaner ( nodeID , r . eC2Wrapper , r . vpcId ). DeleteLeakedResources (); err != nil {
183
- r .log .Error ( err , "failed to cleanup resources during node termination " )
184
- ec2API . NodeTerminationENICleanupFailure . Inc ()
159
+ if ! r . deletePool . TryAcquire ( 1 ) {
160
+ r .log .Info ( "d, will requeue request " )
161
+ return ctrl. Result { RequeueAfter : 10 * time . Second }, nil
185
162
}
163
+ go func (nodeID string ) {
164
+ defer r .deletePool .Release (1 )
165
+ childCtx , cancel := context .WithTimeout (ctx , config .NodeTerminationTimeout )
166
+ defer cancel ()
167
+ if err := r .newResourceCleaner (nodeID , r .eC2Wrapper , r .vpcId , r .log ).DeleteLeakedResources (childCtx ); err != nil {
168
+ r .log .Error (err , "failed to cleanup resources during node termination" )
169
+ ec2API .NodeTerminationENICleanupFailure .Inc ()
170
+ }
171
+ }(nodeID )
186
172
}
187
173
}
188
174
189
- if err := r .finalizerManager . RemoveFinalizers (ctx , cniNode , config .NodeTerminationFinalizer ); err != nil {
175
+ if err := r .removeFinalizer (ctx , cniNode , config .NodeTerminationFinalizer ); err != nil {
190
176
r .log .Error (err , "failed to remove finalizer on CNINode, will retry" , "cniNode" , cniNode .Name , "finalizer" , config .NodeTerminationFinalizer )
177
+ if apierrors .IsConflict (err ) {
178
+ return ctrl.Result {Requeue : true }, nil
179
+ }
191
180
return ctrl.Result {}, err
192
181
}
193
182
return ctrl.Result {}, nil
@@ -207,7 +196,7 @@ func (r *CNINodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
207
196
Spec : cniNode .Spec ,
208
197
}
209
198
210
- if err := r .finalizerManager . RemoveFinalizers (ctx , cniNode , config .NodeTerminationFinalizer ); err != nil {
199
+ if err := r .removeFinalizer (ctx , cniNode , config .NodeTerminationFinalizer ); err != nil {
211
200
r .log .Error (err , "failed to remove finalizer on CNINode, will retry" )
212
201
return ctrl.Result {}, err
213
202
}
@@ -252,7 +241,7 @@ func (r *CNINodeReconciler) waitTillCNINodeDeleted(nameSpacedCNINode types.Names
252
241
oldCNINode := & v1alpha1.CNINode {}
253
242
254
243
return wait .PollUntilContextTimeout (context .TODO (), 500 * time .Millisecond , time .Second * 3 , true , func (ctx context.Context ) (bool , error ) {
255
- if err := r .Client .Get (ctx , nameSpacedCNINode , oldCNINode ); err != nil && errors .IsNotFound (err ) {
244
+ if err := r .Client .Get (ctx , nameSpacedCNINode , oldCNINode ); err != nil && apierrors .IsNotFound (err ) {
256
245
return true , nil
257
246
}
258
247
return false , nil
@@ -266,3 +255,45 @@ func (r *CNINodeReconciler) createCNINodeFromObj(ctx context.Context, newCNINode
266
255
return r .Client .Create (ctx , newCNINode )
267
256
})
268
257
}
258
+
259
+ func (r * CNINodeReconciler ) ensureTagsAndLabels (cniNode * v1alpha1.CNINode , node * v1.Node ) (bool , error ) {
260
+ shouldPatch := false
261
+ var err error
262
+ if cniNode .Spec .Tags == nil {
263
+ cniNode .Spec .Tags = make (map [string ]string )
264
+ }
265
+ // add cluster name tag if it does not exist
266
+ if cniNode .Spec .Tags [config .VPCCNIClusterNameKey ] != r .clusterName {
267
+ cniNode .Spec .Tags [config .VPCCNIClusterNameKey ] = r .clusterName
268
+ shouldPatch = true
269
+ }
270
+ if node != nil {
271
+ var nodeID string
272
+ nodeID , err = utils .GetNodeID (node )
273
+
274
+ if nodeID != "" && cniNode .Spec .Tags [config .NetworkInterfaceNodeIDKey ] != nodeID {
275
+ cniNode .Spec .Tags [config .NetworkInterfaceNodeIDKey ] = nodeID
276
+ shouldPatch = true
277
+ }
278
+
279
+ // add node label if it does not exist
280
+ if cniNode .ObjectMeta .Labels == nil {
281
+ cniNode .ObjectMeta .Labels = make (map [string ]string )
282
+ }
283
+ if cniNode .ObjectMeta .Labels [config .NodeLabelOS ] != node .ObjectMeta .Labels [config .NodeLabelOS ] {
284
+ cniNode .ObjectMeta .Labels [config .NodeLabelOS ] = node .ObjectMeta .Labels [config .NodeLabelOS ]
285
+ shouldPatch = true
286
+ }
287
+ }
288
+ return shouldPatch , err
289
+ }
290
+
291
+ func (r * CNINodeReconciler ) removeFinalizer (ctx context.Context , cniNode * v1alpha1.CNINode , finalizer string ) error {
292
+ cniNodeCopy := cniNode .DeepCopy ()
293
+
294
+ if controllerutil .RemoveFinalizer (cniNodeCopy , finalizer ) {
295
+ r .log .Info ("removing finalizer for cninode" , "name" , cniNode .GetName (), "finalizer" , finalizer )
296
+ return r .Client .Patch (ctx , cniNodeCopy , client .MergeFromWithOptions (cniNode , client.MergeFromWithOptimisticLock {}))
297
+ }
298
+ return nil
299
+ }
0 commit comments