@@ -605,6 +605,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc
605
605
}
606
606
607
607
func (c * clusterCache ) watchEvents (ctx context.Context , api kube.APIResourceInfo , resClient dynamic.ResourceInterface , ns string , resourceVersion string ) {
608
+ timeoutSeconds := int64 (c .watchResyncTimeout .Seconds ())
608
609
kube .RetryUntilSucceed (ctx , watchResourcesRetryTimeout , fmt .Sprintf ("watch %s on %s" , api .GroupKind , c .config .Host ), c .log , func () (err error ) {
609
610
defer func () {
610
611
if r := recover (); r != nil {
@@ -622,6 +623,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
622
623
623
624
w , err := watchutil .NewRetryWatcher (resourceVersion , & cache.ListWatch {
624
625
WatchFunc : func (options metav1.ListOptions ) (watch.Interface , error ) {
626
+ options .TimeoutSeconds = & timeoutSeconds
625
627
res , err := resClient .Watch (ctx , options )
626
628
if errors .IsNotFound (err ) {
627
629
c .stopWatching (api .GroupKind , ns )
@@ -633,30 +635,21 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
633
635
return err
634
636
}
635
637
636
- defer func () {
637
- w .Stop ()
638
- resourceVersion = ""
639
- }()
640
-
641
- var watchResyncTimeoutCh <- chan time.Time
642
- if c .watchResyncTimeout > 0 {
643
- shouldResync := time .NewTimer (c .watchResyncTimeout )
644
- defer shouldResync .Stop ()
645
- watchResyncTimeoutCh = shouldResync .C
646
- }
638
+ defer w .Stop ()
647
639
648
640
for {
649
641
select {
650
642
// stop watching when parent context got cancelled
651
643
case <- ctx .Done ():
652
644
return nil
653
645
654
- // re-synchronize API state and restart watch periodically
655
- case <- watchResyncTimeoutCh :
656
- return fmt .Errorf ("Resyncing %s on %s due to timeout" , api .GroupKind , c .config .Host )
657
-
658
646
// re-synchronize API state and restart watch if retry watcher failed to continue watching using provided resource version
659
647
case <- w .Done ():
648
+ // The underlying retry watcher has stopped, possibly due to specifying an RV in
649
+ // the watch request that is stale (error code 410). This forces us to relist
650
+ // objects from the kube-apiserver to get a fresher RV and we invoke that relist
651
+ // by resetting the locally stored RV.
652
+ resourceVersion = ""
660
653
return fmt .Errorf ("Watch %s on %s has closed" , api .GroupKind , c .config .Host )
661
654
662
655
case event , ok := <- w .ResultChan ():
@@ -666,8 +659,16 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
666
659
667
660
obj , ok := event .Object .(* unstructured.Unstructured )
668
661
if ! ok {
662
+ // We failed to cast the object received in the watch event to something
663
+ // that contains a resource version field. Because of that, we don't know
664
+ // from what RV we should reinitialize the watch connection, so in order to
665
+ // avoid any inconsistencies due to accidental skipping of a potential RV,
666
+ // we reset the locally stored RV to forcefully invoke the list API call to
667
+ // get it from the kube-apiserver.
668
+ resourceVersion = ""
669
669
return fmt .Errorf ("Failed to convert to *unstructured.Unstructured: %v" , event .Object )
670
670
}
671
+ resourceVersion = obj .GetResourceVersion ()
671
672
672
673
c .processEvent (event .Type , obj )
673
674
if kube .IsCRD (obj ) {
0 commit comments