@@ -30,8 +30,11 @@ import (
30
30
31
31
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32
32
"k8s.io/apimachinery/pkg/runtime"
33
+ "k8s.io/apimachinery/pkg/runtime/schema"
33
34
"k8s.io/apimachinery/pkg/types"
34
35
"k8s.io/apimachinery/pkg/util/wait"
36
+ "k8s.io/apimachinery/pkg/watch"
37
+ "k8s.io/client-go/dynamic"
35
38
"k8s.io/client-go/kubernetes"
36
39
"k8s.io/client-go/rest"
37
40
"k8s.io/klog/v2"
@@ -45,6 +48,11 @@ const (
45
48
nodeCreationRetry = time .Minute * 5
46
49
// nodeCreationTimeout is the time after which the context for node creation request is cancelled.
47
50
nodeCreationTimeout = time .Minute * 3
51
+
52
+ // watcherRetryCount is the number of times a watcher creation would be retried for.
53
+ watcherRetryCount = 10
54
+ // watcherRetryDelay is the amount of time to wait before trying recreation of a watcher.
55
+ watcherRetryDelay = time .Second * 5
48
56
)
49
57
50
58
var (
@@ -85,15 +93,9 @@ type Manager struct {
85
93
}
86
94
87
95
// Deploy creates CSIAddonsNode custom resource with all required information.
88
- // When information to create the CSIAddonsNode is missing, an error will be
89
- // returned immediately. If creating the CSIAddonsNode in the Kubernetes
90
- // cluster fails (missing CRD, RBAC limitations, ...), an error will be logged,
91
- // and creation will be retried.
92
- func (mgr * Manager ) Deploy () error {
93
- object , err := mgr .getCSIAddonsNode ()
94
- if err != nil {
95
- return fmt .Errorf ("failed to get csiaddonsNode object: %w" , err )
96
- }
96
+ // If creating the CSIAddonsNode in the Kubernetes cluster fails (missing CRD, RBAC limitations, ...)
97
+ // an error will be logged and creation will be retried.
98
+ func (mgr * Manager ) deploy (object * csiaddonsv1alpha1.CSIAddonsNode ) error {
97
99
98
100
// loop until the CSIAddonsNode has been created
99
101
return wait .PollUntilContextTimeout (context .TODO (), nodeCreationRetry , nodeCreationTimeout , true , func (ctx context.Context ) (bool , error ) {
@@ -133,6 +135,10 @@ func (mgr *Manager) newCSIAddonsNode(node *csiaddonsv1alpha1.CSIAddonsNode) erro
133
135
},
134
136
}
135
137
_ , err = controllerutil .CreateOrUpdate (ctx , cli , csiaddonNode , func () error {
138
+ if ! csiaddonNode .DeletionTimestamp .IsZero () {
139
+ return errors .New ("csiaddonnode is being deleted" )
140
+ }
141
+
136
142
// update the resourceVersion
137
143
resourceVersion := csiaddonNode .ResourceVersion
138
144
if resourceVersion != "" {
@@ -275,3 +281,80 @@ func generateName(nodeID, namespace, ownerKind, ownerName string) (string, error
275
281
276
282
return base , nil
277
283
}
284
+
285
+ // watchCSIAddonsNode starts a watcher for a specific CSIAddonsNode resource identified by its name.
286
+ // If a CSIAddonsNode is deleted, it logs a warning and attempts to recreate it using mgr.Deploy()
287
+ func (mgr * Manager ) watchCSIAddonsNode (node * csiaddonsv1alpha1.CSIAddonsNode ) error {
288
+ // Call deploy on start, this takes care of the cases where
289
+ // a watcher might exit due to an error while trying to
290
+ // recreate CSIAddonsNode in the cluster
291
+ err := mgr .deploy (node )
292
+ if err != nil {
293
+ klog .Fatalf ("Failed to create csiaddonsnode: %v" , err )
294
+ }
295
+
296
+ klog .Infof ("Starting watcher for CSIAddonsNode: %s" , node .Name )
297
+
298
+ dynamicClient , err := dynamic .NewForConfig (mgr .Config )
299
+ if err != nil {
300
+ return fmt .Errorf ("failed to create dynamic client: %w" , err )
301
+ }
302
+
303
+ gvr := schema.GroupVersionResource {
304
+ Group : csiaddonsv1alpha1 .GroupVersion .Group ,
305
+ Version : csiaddonsv1alpha1 .GroupVersion .Version ,
306
+ Resource : "csiaddonsnodes" ,
307
+ }
308
+
309
+ watcher , err := dynamicClient .Resource (gvr ).Namespace (node .Namespace ).Watch (context .Background (), v1.ListOptions {
310
+ FieldSelector : fmt .Sprintf ("metadata.name=%s" , node .Name ),
311
+ })
312
+ if err != nil {
313
+ return fmt .Errorf ("failed to watch CSIAddonsNode objects: %w" , err )
314
+ }
315
+ defer watcher .Stop ()
316
+
317
+ for event := range watcher .ResultChan () {
318
+ switch event .Type {
319
+ case watch .Deleted :
320
+ klog .Infof ("WARNING: An active CSIAddonsNode: %s was deleted, it will be recreated" , node .Name )
321
+
322
+ err := mgr .deploy (node )
323
+ if err != nil {
324
+ return fmt .Errorf ("failed to recreate CSIAddonsNode: %w" , err )
325
+ }
326
+ klog .Infof ("CSIAddonsNode: %s recreated" , node .Name )
327
+ }
328
+ }
329
+
330
+ // The channel was closed by the API server without any errors
331
+ // Simply log it here and return, the dispatcher is responsible
332
+ // for restarting the watcher
333
+ klog .Infof ("Watcher for %s exited gracefully, will be restarted soon" , node .Name )
334
+
335
+ return nil
336
+ }
337
+
338
+ // DispatchWatcher starts a watcher for the CSIAddonsNode and retries
339
+ // if the watcher exits due to an error. It will retry up to a maximum number of
340
+ // attempts defined by watcherRetryCount before returning an error.
341
+ func (mgr * Manager ) DispatchWatcher () error {
342
+ retryCount := 0
343
+ node , err := mgr .getCSIAddonsNode ()
344
+ if err != nil {
345
+ return errors .New ("failed to get CSIAddonsNode object" )
346
+ }
347
+
348
+ for retryCount < int (watcherRetryCount ) {
349
+ err := mgr .watchCSIAddonsNode (node )
350
+ if err != nil {
351
+ klog .Errorf ("Watcher for %s exited, retrying (%d/%d), error: %v" , node .Name , retryCount + 1 , watcherRetryCount , err )
352
+
353
+ retryCount ++
354
+ time .Sleep (watcherRetryDelay )
355
+ } else {
356
+ retryCount = 0
357
+ }
358
+ }
359
+ return fmt .Errorf ("watcher for %s reached max retries, giving up" , node .Name )
360
+ }
0 commit comments