Skip to content

Commit 0c6045d

Browse files
committed
feat: List resources from the watch cache
1 parent 6b2984e commit 0c6045d

File tree

1 file changed

+7
-15
lines changed

1 file changed

+7
-15
lines changed

pkg/cache/cluster.go

+7-15
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,7 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso
553553
}
554554

555555
listRetry.Steps = int(c.listRetryLimit)
556+
opts.ResourceVersion = "0"
556557
err := retry.OnError(listRetry, c.listRetryFunc, func() error {
557558
var ierr error
558559
res, ierr = resClient.List(ctx, opts)
@@ -605,6 +606,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc
605606
}
606607

607608
func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) {
609+
timeoutSeconds := int64(c.watchResyncTimeout.Seconds())
608610
kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) {
609611
defer func() {
610612
if r := recover(); r != nil {
@@ -622,6 +624,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
622624

623625
w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{
624626
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
627+
options.TimeoutSeconds = &timeoutSeconds
625628
res, err := resClient.Watch(ctx, options)
626629
if errors.IsNotFound(err) {
627630
c.stopWatching(api.GroupKind, ns)
@@ -633,30 +636,17 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
633636
return err
634637
}
635638

636-
defer func() {
637-
w.Stop()
638-
resourceVersion = ""
639-
}()
640-
641-
var watchResyncTimeoutCh <-chan time.Time
642-
if c.watchResyncTimeout > 0 {
643-
shouldResync := time.NewTimer(c.watchResyncTimeout)
644-
defer shouldResync.Stop()
645-
watchResyncTimeoutCh = shouldResync.C
646-
}
639+
defer w.Stop()
647640

648641
for {
649642
select {
650643
// stop watching when parent context got cancelled
651644
case <-ctx.Done():
652645
return nil
653646

654-
// re-synchronize API state and restart watch periodically
655-
case <-watchResyncTimeoutCh:
656-
return fmt.Errorf("Resyncing %s on %s due to timeout", api.GroupKind, c.config.Host)
657-
658647
// re-synchronize API state and restart watch if retry watcher failed to continue watching using provided resource version
659648
case <-w.Done():
649+
resourceVersion = ""
660650
return fmt.Errorf("Watch %s on %s has closed", api.GroupKind, c.config.Host)
661651

662652
case event, ok := <-w.ResultChan():
@@ -666,8 +656,10 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
666656

667657
obj, ok := event.Object.(*unstructured.Unstructured)
668658
if !ok {
659+
resourceVersion = ""
669660
return fmt.Errorf("Failed to convert to *unstructured.Unstructured: %v", event.Object)
670661
}
662+
resourceVersion = obj.GetResourceVersion()
671663

672664
c.processEvent(event.Type, obj)
673665
if kube.IsCRD(obj) {

0 commit comments

Comments
 (0)