@@ -16,6 +16,7 @@ package resources
16
16
17
17
import (
18
18
"context"
19
+ "sync/atomic"
19
20
20
21
"github.com/sirupsen/logrus"
21
22
"k8s.io/apimachinery/pkg/api/meta"
@@ -39,28 +40,47 @@ func pagedList(
39
40
* model.KVPairList ,
40
41
error ,
41
42
) {
43
+ // Wrap our incoming listFunc with one that stashes the revision and number
44
+ // of items we've seen so far. This allows us to use the more efficient
45
+ // EachListItem() method, while also capturing the list metadata that we
46
+ // need.
47
+ listResourceVersion := ""
48
+ var numItemsLoaded atomic.Int64 // listFunc is called from background goroutine.
49
+ listFunc = func (ctx context.Context , opts metav1.ListOptions ) (runtime.Object , error ) {
50
+ obj , err := listFunc (ctx , opts )
51
+ m , err := meta .ListAccessor (obj )
52
+ if err != nil {
53
+ return nil , err
54
+ }
55
+ if m .GetResourceVersion () != "" {
56
+ listResourceVersion = m .GetResourceVersion ()
57
+ }
58
+ numItemsLoaded .Add (int64 (meta .LenList (obj )))
59
+ return obj , err
60
+ }
42
61
lp := pager .New (listFunc )
62
+
43
63
opts := metav1.ListOptions {ResourceVersion : revision }
44
64
if revision != "" {
45
65
opts .ResourceVersionMatch = metav1 .ResourceVersionMatchNotOlderThan
46
66
}
47
- result , isPaged , err := lp .List (ctx , opts )
48
- if err != nil {
49
- return nil , K8sErrorToCalico (err , list )
50
- }
51
- logCtx := log .WithField ("pagedList" , isPaged )
52
- logCtx .Debug ("List() call completed, convert results" )
53
67
54
- // For each item in the response, convert it to a KVPair and add it to the list.
55
- kvps := []* model.KVPair {}
56
- err = meta .EachListItem (result , func (obj runtime.Object ) error {
68
+ var kvps []* model.KVPair
69
+ err := lp .EachListItem (ctx , opts , func (obj runtime.Object ) error {
57
70
res := obj .(Resource )
58
71
result , err := toKVPs (res )
59
72
if err != nil {
60
- logCtx .WithError (err ).WithField ("Item" , res ).Warning ("unable to process resource, skipping" )
73
+ log .WithError (err ).WithField ("Item" , res ).Warning ("Unable to process resource, skipping" )
61
74
return nil
62
75
}
63
76
if result != nil {
77
+ if kvps == nil {
78
+ // Try to guess a suitable result slice capacity. In practice,
79
+ // this will be the size of the first page (but that's usually
80
+ // the only page.)
81
+ ratio := len (result )
82
+ kvps = make ([]* model.KVPair , 0 , int (numItemsLoaded .Load ())* ratio )
83
+ }
64
84
kvps = append (kvps , result ... )
65
85
}
66
86
return nil
@@ -69,13 +89,12 @@ func pagedList(
69
89
return nil , K8sErrorToCalico (err , list )
70
90
}
71
91
72
- // Extract list revision information.
73
- m , err := meta .ListAccessor (result )
74
- if err != nil {
75
- return nil , err
92
+ if listResourceVersion == "" {
93
+ log .WithField ("list" , list ).Panic ("Failed to extract resource version from list." )
76
94
}
95
+
77
96
return & model.KVPairList {
78
97
KVPairs : kvps ,
79
- Revision : m . GetResourceVersion () ,
98
+ Revision : listResourceVersion ,
80
99
}, nil
81
100
}
0 commit comments