Skip to content

Commit 6b8426a

Browse files
authored
Merge pull request #23 from civo/fix/recoard-reboot-time
Fix to prevent consecutive reboots in case of reboot delay.
2 parents 12d926b + 44c44cd commit 6b8426a

File tree

2 files changed

+169
-0
lines changed

2 files changed

+169
-0
lines changed

pkg/watcher/watcher.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"log/slog"
77
"strconv"
8+
"sync"
89
"time"
910

1011
"github.com/civo/civogo"
@@ -39,6 +40,9 @@ type watcher struct {
3940
nodeDesiredGPUCount int
4041
rebootTimeWindowMinutes time.Duration
4142

43+
// NOTE: This is only effective when running with a single node-agent. If we want to run multiple instances, additional logic modifications will be required.
44+
lastRebootCmdTimes sync.Map
45+
4246
nodeSelector *metav1.LabelSelector
4347
}
4448

@@ -157,11 +161,23 @@ func (w *watcher) run(ctx context.Context) error {
157161

158162
for _, node := range nodes.Items {
159163
if !isNodeDesiredGPU(&node, w.nodeDesiredGPUCount) || !isNodeReady(&node) {
164+
165+
// LTT: LastTransitionTime of node.
166+
// LRCT: LastRebootCmdTimes
167+
// 60: Threshold time (example)
168+
// - LTT > 60 , LRCT < 60 dont reboot
169+
// - LTT < 60 , LRCT < 60 dont reboot
170+
// - LTT < 60 , LRCT > 60 dont reboot
171+
// - LTT > 60, LRCT >. 60 reboot
160172
slog.Info("Node is not ready, attempting to reboot", "node", node.GetName())
161173
if isReadyOrNotReadyStatusChangedAfter(&node, thresholdTime) {
162174
slog.Info("Skipping reboot because Ready/NotReady status was updated recently", "node", node.GetName())
163175
continue
164176
}
177+
if w.isLastRebootCommandTimeAfter(node.GetName(), thresholdTime) {
178+
slog.Info("Skipping reboot because Reboot command was executed recently", "node", node.GetName())
179+
continue
180+
}
165181
if err := w.rebootNode(node.GetName()); err != nil {
166182
slog.Error("Failed to reboot Node", "node", node.GetName(), "error", err)
167183
return fmt.Errorf("failed to reboot node: %w", err)
@@ -193,6 +209,32 @@ func isReadyOrNotReadyStatusChangedAfter(node *corev1.Node, thresholdTime time.T
193209
return lastChangedTime.After(thresholdTime)
194210
}
195211

212+
// isLastRebootCommandTimeAfter checks if the last reboot command time for the specified node
213+
// is after the given threshold time. In case of delays in reboot, the
214+
// LastTransitionTime of node might not be updated, so it compares the latest reboot
215+
// command time to prevent sending reboot commands multiple times.
216+
// NOTE: This is only effective when running with a single node-agent. If we want to run multiple instances, additional logic modifications will be required.
217+
func (w *watcher) isLastRebootCommandTimeAfter(nodeName string, thresholdTime time.Time) bool {
218+
v, ok := w.lastRebootCmdTimes.Load(nodeName)
219+
if !ok {
220+
slog.Info("LastRebootCommandTime not found", "node", nodeName)
221+
return false
222+
}
223+
lastRebootCmdTime, ok := v.(time.Time)
224+
if !ok {
225+
slog.Info("LastRebootCommandTime is invalid, so it will be removed from the records", "node", nodeName, "value", v)
226+
w.lastRebootCmdTimes.Delete(nodeName)
227+
return false
228+
}
229+
230+
slog.Info("Checking if LastRebootCommandTime has changed recently",
231+
"node", nodeName,
232+
"lastRebootCommandTime", lastRebootCmdTime.String(),
233+
"thresholdTime", thresholdTime.String())
234+
235+
return lastRebootCmdTime.After(thresholdTime)
236+
}
237+
196238
func isNodeReady(node *corev1.Node) bool {
197239
for _, cond := range node.Status.Conditions {
198240
if cond.Type == corev1.NodeReady {
@@ -241,5 +283,6 @@ func (w *watcher) rebootNode(name string) error {
241283
return fmt.Errorf("failed to reboot instance, clusterID: %s, instanceID: %s: %w", w.clusterID, instance.ID, err)
242284
}
243285
slog.Info("Instance is rebooting", "instanceID", instance.ID, "node", name)
286+
w.lastRebootCmdTimes.Store(name, time.Now())
244287
return nil
245288
}

pkg/watcher/watcher_test.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,50 @@ func TestRun(t *testing.T) {
369369
t.Helper()
370370
client := w.client.(*fake.Clientset)
371371

372+
w.lastRebootCmdTimes.Store("node-01", time.Now())
373+
374+
nodes := &corev1.NodeList{
375+
Items: []corev1.Node{
376+
{
377+
ObjectMeta: metav1.ObjectMeta{
378+
Name: "node-01",
379+
Labels: map[string]string{
380+
nodePoolLabelKey: testNodePoolID,
381+
},
382+
},
383+
Status: corev1.NodeStatus{
384+
Conditions: []corev1.NodeCondition{
385+
{
386+
Type: corev1.NodeReady,
387+
Status: corev1.ConditionFalse,
388+
},
389+
},
390+
Allocatable: corev1.ResourceList{
391+
gpuResourceName: resource.MustParse("8"),
392+
},
393+
},
394+
},
395+
},
396+
}
397+
client.Fake.PrependReactor("list", "nodes", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
398+
return true, nodes, nil
399+
})
400+
},
401+
},
402+
{
403+
name: "Returns nil and skips reboot when GPU count matches desired but node is not ready, and LastRebootCmdTime is more recent than thresholdTime",
404+
args: args{
405+
opts: []Option{
406+
WithKubernetesClient(fake.NewSimpleClientset()),
407+
WithCivoClient(&FakeClient{}),
408+
WithDesiredGPUCount(testNodeDesiredGPUCount),
409+
},
410+
nodePoolID: testNodePoolID,
411+
},
412+
beforeFunc: func(w *watcher) {
413+
t.Helper()
414+
client := w.client.(*fake.Clientset)
415+
372416
nodes := &corev1.NodeList{
373417
Items: []corev1.Node{
374418
{
@@ -600,6 +644,88 @@ func TestIsReadyOrNotReadyStatusChangedAfter(t *testing.T) {
600644
}
601645
}
602646

647+
func TestIsLastRebootCommandTimeAfter(t *testing.T) {
648+
type test struct {
649+
name string
650+
nodeName string
651+
opts []Option
652+
thresholdTime time.Time
653+
beforeFunc func(*watcher)
654+
want bool
655+
}
656+
657+
tests := []test{
658+
{
659+
name: "Return true when last reboot command time is after threshold",
660+
opts: []Option{
661+
WithKubernetesClient(fake.NewSimpleClientset()),
662+
WithCivoClient(&FakeClient{}),
663+
},
664+
nodeName: "node-01",
665+
thresholdTime: time.Now().Add(-time.Hour),
666+
beforeFunc: func(w *watcher) {
667+
w.lastRebootCmdTimes.Store("node-01", time.Now())
668+
},
669+
want: true,
670+
},
671+
{
672+
name: "Return false when last reboot command time is before threshold",
673+
opts: []Option{
674+
WithKubernetesClient(fake.NewSimpleClientset()),
675+
WithCivoClient(&FakeClient{}),
676+
},
677+
nodeName: "node-01",
678+
thresholdTime: time.Now().Add(-time.Hour),
679+
beforeFunc: func(w *watcher) {
680+
w.lastRebootCmdTimes.Store("nodde-01", time.Now().Add(-2*time.Hour))
681+
},
682+
want: false,
683+
},
684+
{
685+
name: "Return false when last reboot command time not found",
686+
opts: []Option{
687+
WithKubernetesClient(fake.NewSimpleClientset()),
688+
WithCivoClient(&FakeClient{}),
689+
},
690+
nodeName: "node-01",
691+
thresholdTime: time.Now().Add(-time.Hour),
692+
want: false,
693+
},
694+
{
695+
name: "Return false when type of last reboot command time is invalid",
696+
opts: []Option{
697+
WithKubernetesClient(fake.NewSimpleClientset()),
698+
WithCivoClient(&FakeClient{}),
699+
},
700+
nodeName: "node-01",
701+
thresholdTime: time.Now().Add(-time.Hour),
702+
beforeFunc: func(w *watcher) {
703+
w.lastRebootCmdTimes.Store("nodde-01", "invalid-type")
704+
},
705+
want: false,
706+
},
707+
}
708+
709+
for _, test := range tests {
710+
t.Run(test.name, func(t *testing.T) {
711+
w, err := NewWatcher(t.Context(),
712+
testApiURL, testApiKey, testRegion, testClusterID, testNodePoolID, test.opts...)
713+
if err != nil {
714+
t.Fatal(err)
715+
}
716+
717+
obj := w.(*watcher)
718+
if test.beforeFunc != nil {
719+
test.beforeFunc(obj)
720+
}
721+
got := obj.isLastRebootCommandTimeAfter(test.nodeName, test.thresholdTime)
722+
if got != test.want {
723+
t.Errorf("got = %v, want %v", got, test.want)
724+
}
725+
})
726+
}
727+
}
728+
603729
func TestIsNodeReady(t *testing.T) {
604730
type test struct {
605731
name string

0 commit comments

Comments
 (0)