@@ -605,6 +605,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc
605
605
}
606
606
607
607
func (c * clusterCache ) watchEvents (ctx context.Context , api kube.APIResourceInfo , resClient dynamic.ResourceInterface , ns string , resourceVersion string ) {
608
+ timeoutSeconds := int64 (c .watchResyncTimeout .Seconds ())
608
609
kube .RetryUntilSucceed (ctx , watchResourcesRetryTimeout , fmt .Sprintf ("watch %s on %s" , api .GroupKind , c .config .Host ), c .log , func () (err error ) {
609
610
defer func () {
610
611
if r := recover (); r != nil {
@@ -622,6 +623,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
622
623
623
624
w , err := watchutil .NewRetryWatcher (resourceVersion , & cache.ListWatch {
624
625
WatchFunc : func (options metav1.ListOptions ) (watch.Interface , error ) {
626
+ options .TimeoutSeconds = & timeoutSeconds
625
627
res , err := resClient .Watch (ctx , options )
626
628
if errors .IsNotFound (err ) {
627
629
c .stopWatching (api .GroupKind , ns )
@@ -633,30 +635,17 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
633
635
return err
634
636
}
635
637
636
- defer func () {
637
- w .Stop ()
638
- resourceVersion = ""
639
- }()
640
-
641
- var watchResyncTimeoutCh <- chan time.Time
642
- if c .watchResyncTimeout > 0 {
643
- shouldResync := time .NewTimer (c .watchResyncTimeout )
644
- defer shouldResync .Stop ()
645
- watchResyncTimeoutCh = shouldResync .C
646
- }
638
+ defer w .Stop ()
647
639
648
640
for {
649
641
select {
650
642
// stop watching when parent context got cancelled
651
643
case <- ctx .Done ():
652
644
return nil
653
645
654
- // re-synchronize API state and restart watch periodically
655
- case <- watchResyncTimeoutCh :
656
- return fmt .Errorf ("Resyncing %s on %s due to timeout" , api .GroupKind , c .config .Host )
657
-
658
646
// re-synchronize API state and restart watch if retry watcher failed to continue watching using provided resource version
659
647
case <- w .Done ():
648
+ resourceVersion = ""
660
649
return fmt .Errorf ("Watch %s on %s has closed" , api .GroupKind , c .config .Host )
661
650
662
651
case event , ok := <- w .ResultChan ():
@@ -666,8 +655,10 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
666
655
667
656
obj , ok := event .Object .(* unstructured.Unstructured )
668
657
if ! ok {
658
+ resourceVersion = ""
669
659
return fmt .Errorf ("Failed to convert to *unstructured.Unstructured: %v" , event .Object )
670
660
}
661
+ resourceVersion = obj .GetResourceVersion ()
671
662
672
663
c .processEvent (event .Type , obj )
673
664
if kube .IsCRD (obj ) {
0 commit comments