4
4
"context"
5
5
"fmt"
6
6
"reflect"
7
+ "sort"
8
+ "strings"
7
9
"sync"
8
10
"time"
9
11
@@ -12,6 +14,7 @@ import (
12
14
"golang.org/x/time/rate"
13
15
14
16
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
+ "k8s.io/apimachinery/pkg/util/errors"
15
18
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
16
19
"k8s.io/apimachinery/pkg/util/wait"
17
20
@@ -67,6 +70,8 @@ type SyncWorkerStatus struct {
67
70
Initial bool
68
71
VersionHash string
69
72
73
+ LastProgress time.Time
74
+
70
75
Actual configv1.Update
71
76
}
72
77
@@ -304,6 +309,9 @@ func (w *statusWrapper) Report(status SyncWorkerStatus) {
304
309
}
305
310
}
306
311
}
312
+ if status .Fraction > p .Fraction || status .Completed > p .Completed || (status .Failure == nil && status .Actual != p .Actual ) {
313
+ status .LastProgress = time .Now ()
314
+ }
307
315
w .w .updateStatus (status )
308
316
}
309
317
@@ -471,7 +479,7 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
471
479
}
472
480
473
481
// update each object
474
- err := payload .RunGraph (ctx , graph , maxWorkers , func (ctx context.Context , tasks []* payload.Task ) error {
482
+ errs := payload .RunGraph (ctx , graph , maxWorkers , func (ctx context.Context , tasks []* payload.Task ) error {
475
483
for _ , task := range tasks {
476
484
if contextIsCancelled (ctx ) {
477
485
return cr .CancelError ()
@@ -495,8 +503,8 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
495
503
}
496
504
return nil
497
505
})
498
- if err != nil {
499
- cr .Error ( err )
506
+ if len ( errs ) > 0 {
507
+ err := cr .Errors ( errs )
500
508
return err
501
509
}
502
510
@@ -518,6 +526,12 @@ func init() {
518
526
)
519
527
}
520
528
529
+ type errCanceled struct {
530
+ err error
531
+ }
532
+
533
+ func (e errCanceled ) Error () string { return e .err .Error () }
534
+
521
535
// consistentReporter hides the details of calculating the status based on the progress
522
536
// of the graph runner.
523
537
type consistentReporter struct {
@@ -553,14 +567,31 @@ func (r *consistentReporter) Error(err error) {
553
567
copied := r .status
554
568
copied .Step = "ApplyResources"
555
569
copied .Fraction = float32 (r .done ) / float32 (r .total )
556
- copied .Failure = err
570
+ if ! isCancelledError (err ) {
571
+ copied .Failure = err
572
+ }
573
+ r .reporter .Report (copied )
574
+ }
575
+
576
+ func (r * consistentReporter ) Errors (errs []error ) error {
577
+ err := summarizeTaskGraphErrors (errs )
578
+
579
+ r .lock .Lock ()
580
+ defer r .lock .Unlock ()
581
+ copied := r .status
582
+ copied .Step = "ApplyResources"
583
+ copied .Fraction = float32 (r .done ) / float32 (r .total )
584
+ if err != nil {
585
+ copied .Failure = err
586
+ }
557
587
r .reporter .Report (copied )
588
+ return err
558
589
}
559
590
560
591
func (r * consistentReporter ) CancelError () error {
561
592
r .lock .Lock ()
562
593
defer r .lock .Unlock ()
563
- return fmt .Errorf ("update was cancelled at %d/%d" , r .done , r .total )
594
+ return errCanceled { fmt .Errorf ("update was cancelled at %d/%d" , r .done , r .total )}
564
595
}
565
596
566
597
func (r * consistentReporter ) Complete () {
@@ -576,6 +607,136 @@ func (r *consistentReporter) Complete() {
576
607
r .reporter .Report (copied )
577
608
}
578
609
610
+ func isCancelledError (err error ) bool {
611
+ if err == nil {
612
+ return false
613
+ }
614
+ _ , ok := err .(errCanceled )
615
+ return ok
616
+ }
617
+
618
+ // summarizeTaskGraphErrors takes a set of errors returned by the execution of a graph and attempts
619
+ // to reduce them to a single cause or message. This is domain specific to the payload and our update
620
+ // algorithms. The return value is the summarized error which may be nil if provided conditions are
621
+ // not truly an error (cancellation).
622
+ // TODO: take into account install vs upgrade
623
+ func summarizeTaskGraphErrors (errs []error ) error {
624
+ // we ignore cancellation errors since they don't provide good feedback to users and are an internal
625
+ // detail of the server
626
+ err := errors .FilterOut (errors .NewAggregate (errs ), isCancelledError )
627
+ if err == nil {
628
+ glog .V (4 ).Infof ("All errors were cancellation errors: %v" , errs )
629
+ return nil
630
+ }
631
+ agg , ok := err .(errors.Aggregate )
632
+ if ! ok {
633
+ errs = []error {err }
634
+ } else {
635
+ errs = agg .Errors ()
636
+ }
637
+
638
+ // log the errors to assist in debugging future summarization
639
+ if glog .V (4 ) {
640
+ glog .Infof ("Summarizing %d errors" , len (errs ))
641
+ for _ , err := range errs {
642
+ if uErr , ok := err .(* payload.UpdateError ); ok {
643
+ if uErr .Task != nil {
644
+ glog .Infof ("Update error %d/%d: %s %s (%T: %v)" , uErr .Task .Index , uErr .Task .Total , uErr .Reason , uErr .Message , uErr .Nested , uErr .Nested )
645
+ } else {
646
+ glog .Infof ("Update error: %s %s (%T: %v)" , uErr .Reason , uErr .Message , uErr .Nested , uErr .Nested )
647
+ }
648
+ } else {
649
+ glog .Infof ("Update error: %T: %v" , err , err )
650
+ }
651
+ }
652
+ }
653
+
654
+ // collapse into a set of common errors where necessary
655
+ if len (errs ) == 1 {
656
+ return errs [0 ]
657
+ }
658
+ if err := newClusterOperatorsNotAvailable (errs ); err != nil {
659
+ return err
660
+ }
661
+ return newMultipleError (errs )
662
+ }
663
+
664
+ // newClusterOperatorsNotAvailable unifies multiple ClusterOperatorNotAvailable errors into
665
+ // a single error. It returns nil if the provided errors are not of the same type.
666
+ func newClusterOperatorsNotAvailable (errs []error ) error {
667
+ names := make ([]string , 0 , len (errs ))
668
+ for _ , err := range errs {
669
+ uErr , ok := err .(* payload.UpdateError )
670
+ if ! ok || uErr .Reason != "ClusterOperatorNotAvailable" {
671
+ return nil
672
+ }
673
+ if len (uErr .Name ) > 0 {
674
+ names = append (names , uErr .Name )
675
+ }
676
+ }
677
+ if len (names ) == 0 {
678
+ return nil
679
+ }
680
+
681
+ nested := make ([]error , 0 , len (errs ))
682
+ for _ , err := range errs {
683
+ nested = append (nested , err )
684
+ }
685
+ sort .Strings (names )
686
+ name := strings .Join (names , ", " )
687
+ return & payload.UpdateError {
688
+ Nested : errors .NewAggregate (errs ),
689
+ Reason : "ClusterOperatorsNotAvailable" ,
690
+ Message : fmt .Sprintf ("Some cluster operators are still updating: %s" , name ),
691
+ Name : name ,
692
+ }
693
+ }
694
+
695
+ // uniqueStrings returns an array with all sequential identical items removed. It modifies the contents
696
+ // of arr. Sort the input array before calling to remove all duplicates.
697
+ func uniqueStrings (arr []string ) []string {
698
+ var last int
699
+ for i := 1 ; i < len (arr ); i ++ {
700
+ if arr [i ] == arr [last ] {
701
+ continue
702
+ }
703
+ last ++
704
+ if last != i {
705
+ arr [last ] = arr [i ]
706
+ }
707
+ }
708
+ if last < len (arr ) {
709
+ last ++
710
+ }
711
+ return arr [:last ]
712
+ }
713
+
714
+ // newMultipleError reports a generic set of errors that block progress. This method expects multiple
715
+ // errors but handles singular and empty arrays gracefully. If all errors have the same message, the
716
+ // first item is returned.
717
+ func newMultipleError (errs []error ) error {
718
+ if len (errs ) == 0 {
719
+ return nil
720
+ }
721
+ if len (errs ) == 1 {
722
+ return errs [0 ]
723
+ }
724
+ messages := make ([]string , 0 , len (errs ))
725
+ for _ , err := range errs {
726
+ messages = append (messages , err .Error ())
727
+ }
728
+ sort .Strings (messages )
729
+ messages = uniqueStrings (messages )
730
+ if len (messages ) == 0 {
731
+ return errs [0 ]
732
+ }
733
+ return & payload.UpdateError {
734
+ Nested : errors .NewAggregate (errs ),
735
+ Reason : "MultipleErrors" ,
736
+ Message : fmt .Sprintf ("Multiple errors are preventing progress:\n * %s" , strings .Join (messages , "\n * " )),
737
+ }
738
+ }
739
+
579
740
// getOverrideForManifest returns the override and true when override exists for manifest.
580
741
func getOverrideForManifest (overrides []configv1.ComponentOverride , manifest * lib.Manifest ) (configv1.ComponentOverride , bool ) {
581
742
for idx , ov := range overrides {
0 commit comments