Skip to content

Commit 924a1c7

Browse files
committed
Use more effiicent ListPager API.
Use ListPager.EachListItem instead of ListPager.List. Allows for incremental conversion page-by-page instead of loading up all pages into an upstream-format List object and then converting en-masse.
1 parent 2b55f03 commit 924a1c7

File tree

2 files changed

+79
-54
lines changed

2 files changed

+79
-54
lines changed

libcalico-go/lib/backend/k8s/resources/list_pager.go

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package resources
1616

1717
import (
1818
"context"
19+
"sync/atomic"
1920

2021
"github.com/sirupsen/logrus"
2122
"k8s.io/apimachinery/pkg/api/meta"
@@ -39,28 +40,47 @@ func pagedList(
3940
*model.KVPairList,
4041
error,
4142
) {
42-
lp := pager.New(listFunc)
43+
// Wrap our incoming listFunc with one that stashes the revision and number
44+
// of items we've seen so far. This allows us to use the more efficient
45+
// EachListItem() method, while also capturing the list metadata that we
46+
// need.
47+
listResourceVersion := ""
48+
var numItemsLoaded atomic.Int64 // listFunc is called from background goroutine.
49+
shimmedListFunc := func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
50+
obj, err := listFunc(ctx, opts)
51+
m, err := meta.ListAccessor(obj)
52+
if err != nil {
53+
return nil, err
54+
}
55+
if m.GetResourceVersion() != "" {
56+
listResourceVersion = m.GetResourceVersion()
57+
}
58+
numItemsLoaded.Add(int64(meta.LenList(obj)))
59+
return obj, err
60+
}
61+
lp := pager.New(shimmedListFunc)
62+
4363
opts := metav1.ListOptions{ResourceVersion: revision}
4464
if revision != "" {
4565
opts.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan
4666
}
47-
result, isPaged, err := lp.List(ctx, opts)
48-
if err != nil {
49-
return nil, K8sErrorToCalico(err, list)
50-
}
51-
logCtx := log.WithField("pagedList", isPaged)
52-
logCtx.Debug("List() call completed, convert results")
5367

54-
// For each item in the response, convert it to a KVPair and add it to the list.
55-
kvps := []*model.KVPair{}
56-
err = meta.EachListItem(result, func(obj runtime.Object) error {
68+
var kvps []*model.KVPair
69+
err := lp.EachListItem(ctx, opts, func(obj runtime.Object) error {
5770
res := obj.(Resource)
5871
result, err := toKVPs(res)
5972
if err != nil {
60-
logCtx.WithError(err).WithField("Item", res).Warning("unable to process resource, skipping")
73+
log.WithError(err).WithField("Item", res).Warning("Unable to process resource, skipping")
6174
return nil
6275
}
6376
if result != nil {
77+
if kvps == nil {
78+
// Try to guess a suitable result slice capacity. In practice,
79+
// this will be the size of the first page (but that's usually
80+
// the only page.)
81+
ratio := len(result)
82+
kvps = make([]*model.KVPair, 0, int(numItemsLoaded.Load())*ratio)
83+
}
6484
kvps = append(kvps, result...)
6585
}
6686
return nil
@@ -69,13 +89,12 @@ func pagedList(
6989
return nil, K8sErrorToCalico(err, list)
7090
}
7191

72-
// Extract list revision information.
73-
m, err := meta.ListAccessor(result)
74-
if err != nil {
75-
return nil, err
92+
if listResourceVersion == "" {
93+
log.WithField("list", list).Panic("Failed to extract resource version from list.")
7694
}
95+
7796
return &model.KVPairList{
7897
KVPairs: kvps,
79-
Revision: m.GetResourceVersion(),
98+
Revision: listResourceVersion,
8099
}, nil
81100
}

libcalico-go/lib/backend/k8s/resources/profile.go

Lines changed: 44 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -186,54 +186,60 @@ func (c *profileClient) List(ctx context.Context, list model.ListInterface, revi
186186
return nil, err
187187
}
188188

189-
// Enumerate matching namespaces, paginated.
190-
listFunc := func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
191-
if saName != "" {
192-
// We've been asked to list a particular service account, skip
193-
// listing namespaces.
194-
return &v1.NamespaceList{}, nil
189+
var nsKVPs *model.KVPairList
190+
if saName == "" {
191+
// We haven't been asked for a specific service account, so we
192+
// must query the namespaces.
193+
listFunc := func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
194+
if nsName != "" {
195+
opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", nsName).String()
196+
}
197+
return c.clientSet.CoreV1().Namespaces().List(ctx, opts)
195198
}
196-
if nsName != "" {
197-
opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", nsName).String()
199+
convertFunc := func(r Resource) ([]*model.KVPair, error) {
200+
ns := r.(*v1.Namespace)
201+
kvp, err := c.getNsKv(ns)
202+
if err != nil {
203+
return nil, err
204+
}
205+
return []*model.KVPair{kvp}, nil
198206
}
199-
return c.clientSet.CoreV1().Namespaces().List(ctx, opts)
200-
}
201-
convertFunc := func(r Resource) ([]*model.KVPair, error) {
202-
ns := r.(*v1.Namespace)
203-
kvp, err := c.getNsKv(ns)
207+
nsKVPs, err = pagedList(ctx, logContext.WithField("from", "namespaces"), nsRev, list, convertFunc, listFunc)
204208
if err != nil {
205209
return nil, err
206210
}
207-
return []*model.KVPair{kvp}, nil
208-
}
209-
nsKVPs, err := pagedList(ctx, logContext.WithField("from", "namespaces"), nsRev, list, convertFunc, listFunc)
210-
if err != nil {
211-
return nil, err
212-
}
213-
214-
// Enumerate matching service accounts, paginated.
215-
listFunc = func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
216-
if nsName != "" && saName == "" {
217-
// We've been asked to list a particular namespace, skip
218-
// listing service accounts.
219-
return &v1.ServiceAccountList{}, nil
211+
} else {
212+
// We have been asked for a specific service account, so we don't
213+
// need to query namespaces.
214+
nsKVPs = &model.KVPairList{}
215+
}
216+
217+
var saKVPs *model.KVPairList
218+
if nsName == "" || saName != "" {
219+
// We haven't been asked for a specific namespace, so we need to
220+
// query the service accounts.
221+
listFunc := func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
222+
if saName != "" {
223+
opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", saName).String()
224+
}
225+
return c.clientSet.CoreV1().ServiceAccounts(nsName).List(ctx, opts)
220226
}
221-
if saName != "" {
222-
opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", saName).String()
227+
convertFunc := func(r Resource) ([]*model.KVPair, error) {
228+
sa := r.(*v1.ServiceAccount)
229+
kvp, err := c.getSaKv(sa)
230+
if err != nil {
231+
return nil, err
232+
}
233+
return []*model.KVPair{kvp}, nil
223234
}
224-
return c.clientSet.CoreV1().ServiceAccounts(nsName).List(ctx, opts)
225-
}
226-
convertFunc = func(r Resource) ([]*model.KVPair, error) {
227-
sa := r.(*v1.ServiceAccount)
228-
kvp, err := c.getSaKv(sa)
235+
saKVPs, err = pagedList(ctx, logContext.WithField("from", "serviceaccounts"), saRev, list, convertFunc, listFunc)
229236
if err != nil {
230237
return nil, err
231238
}
232-
return []*model.KVPair{kvp}, nil
233-
}
234-
saKVPs, err := pagedList(ctx, logContext.WithField("from", "serviceaccounts"), saRev, list, convertFunc, listFunc)
235-
if err != nil {
236-
return nil, err
239+
} else {
240+
// We have been asked for a specific namespace, so we don't need to
241+
// query service accounts.
242+
saKVPs = &model.KVPairList{}
237243
}
238244

239245
// Return a merged KVPairList including both results and the default-allow profile.

0 commit comments

Comments
 (0)