Skip to content

Commit a4bf191

Browse files
authored
Passing page limit to cach config instead of override. (#452)
* passing page limit to cache config * adding error log to optimized list watcher
1 parent 0a0c86b commit a4bf191

File tree

2 files changed

+26
-12
lines changed

2 files changed

+26
-12
lines changed

controllers/custom/builder.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,16 @@ func (b *Builder) Complete(reconciler Reconciler) (healthz.Checker, error) {
113113
workqueue.DefaultControllerRateLimiter(), b.options.Name)
114114

115115
optimizedListWatch := newOptimizedListWatcher(b.ctx, b.clientSet.CoreV1().RESTClient(),
116-
b.converter.Resource(), b.options.Namespace, b.options.PageLimit, b.converter)
116+
b.converter.Resource(), b.options.Namespace, b.converter, b.log.WithName("listWatcher"))
117117

118118
// Create the config for low level controller with the custom converter
119119
// list and watch
120120
config := &cache.Config{
121-
Queue: cache.NewDeltaFIFO(b.converter.Indexer, b.dataStore),
122-
ListerWatcher: optimizedListWatch,
123-
ObjectType: b.converter.ResourceType(),
124-
FullResyncPeriod: b.options.ResyncPeriod,
121+
Queue: cache.NewDeltaFIFO(b.converter.Indexer, b.dataStore),
122+
ListerWatcher: optimizedListWatch,
123+
WatchListPageSize: int64(b.options.PageLimit),
124+
ObjectType: b.converter.ResourceType(),
125+
FullResyncPeriod: b.options.ResyncPeriod,
125126
Process: func(obj interface{}, _ bool) error {
126127
// from oldest to newest
127128
for _, d := range obj.(cache.Deltas) {

controllers/custom/custom_controller.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/condition"
2323
"github.com/go-logr/logr"
24+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2425
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2526
"k8s.io/apimachinery/pkg/runtime"
2627
"k8s.io/apimachinery/pkg/types"
@@ -178,23 +179,26 @@ func (c *CustomController) WaitForCacheSync(controller cache.Controller) {
178179

179180
// newOptimizedListWatcher returns a list watcher with a custom list function that converts the
180181
// response for each page using the converter function and returns a general watcher
181-
func newOptimizedListWatcher(ctx context.Context, restClient cache.Getter, resource string, namespace string, limit int,
182-
converter Converter) *cache.ListWatch {
182+
func newOptimizedListWatcher(ctx context.Context, restClient cache.Getter, resource string, namespace string,
183+
converter Converter, log logr.Logger) *cache.ListWatch {
183184

184185
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
185186
list, err := restClient.Get().
186187
Namespace(namespace).
187188
Resource(resource).
188-
// This needs to be done because just setting the limit using option's
189-
// Limit is being overridden and the response is returned without pagination.
190189
VersionedParams(&metav1.ListOptions{
191-
Limit: int64(limit),
190+
Limit: options.Limit,
192191
Continue: options.Continue,
193192
}, metav1.ParameterCodec).
194193
Do(ctx).
195194
Get()
196195
if err != nil {
197-
return list, err
196+
if statusErr, ok := err.(*apierrors.StatusError); ok {
197+
log.Error(err, "List operation error", "code", statusErr.Status().Code)
198+
} else {
199+
log.Error(err, "List operation error")
200+
}
201+
return nil, err
198202
}
199203
// Strip down the the list before passing the paginated response back to
200204
// the pager function
@@ -206,11 +210,20 @@ func newOptimizedListWatcher(ctx context.Context, restClient cache.Getter, resou
206210
// before storing the object in the data store.
207211
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
208212
options.Watch = true
209-
return restClient.Get().
213+
watch, err := restClient.Get().
210214
Namespace(namespace).
211215
Resource(resource).
212216
VersionedParams(&options, metav1.ParameterCodec).
213217
Watch(ctx)
218+
if err != nil {
219+
if statusErr, ok := err.(*apierrors.StatusError); ok {
220+
log.Error(err, "Watch operation error", "code", statusErr.Status().Code)
221+
} else {
222+
log.Error(err, "Watch operation error")
223+
}
224+
return nil, err
225+
}
226+
return watch, err
214227
}
215228
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
216229
}

0 commit comments

Comments
 (0)