Skip to content

Commit 4a893e9

Browse files
authored
feat: add option to convert empty replicasets to delete event (#225)
1 parent c5eb56b commit 4a893e9

File tree

5 files changed

+44
-14
lines changed

5 files changed

+44
-14
lines changed

cmd/dump/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func run(ctx context.Context) error {
6666

6767
log = log.WithField("k8s_version", v.Full())
6868

69-
delta, err := controller.CollectSingleSnapshot(ctx, log, clusterID, clientset, dynamicClient, metricsClient, cfg.Controller, v)
69+
delta, err := controller.CollectSingleSnapshot(ctx, log, clusterID, clientset, dynamicClient, metricsClient, cfg.Controller, v, cfg.SelfPod.Namespace)
7070
if err != nil {
7171
return err
7272
}

internal/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ type Controller struct {
135135
AnnotationsMaxLength string `mapstructure:"annotations_max_length"`
136136
ForcePagination bool `mapstructure:"force_pagination"`
137137
PageSize int64 `mapstructure:"page_size"`
138+
FilterEmptyReplicaSets bool `mapstructure:"filter_empty_replica_sets"`
138139

139140
// DisabledInformers contains a list of informers to disable,
140141
// for example:

internal/services/controller/controller.go

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ type conditionalInformer struct {
102102
permissionVerbs []string
103103
isApplied bool
104104
isResourceInError bool
105+
transformers transformers.Transformers
105106
}
106107

107108
func (i *conditionalInformer) Name() string {
@@ -120,6 +121,7 @@ func CollectSingleSnapshot(ctx context.Context,
120121
metricsClient versioned.Interface,
121122
cfg *config.Controller,
122123
v version.Interface,
124+
castwareNamespace string,
123125
) (*castai.Delta, error) {
124126
tweakListOptions := func(options *metav1.ListOptions) {
125127
if cfg.ForcePagination && options.ResourceVersion == "0" {
@@ -131,7 +133,7 @@ func CollectSingleSnapshot(ctx context.Context,
131133
f := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithTweakListOptions(tweakListOptions))
132134
df := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, 0, metav1.NamespaceAll, tweakListOptions)
133135

134-
defaultInformers := getDefaultInformers(f)
136+
defaultInformers := getDefaultInformers(f, castwareNamespace, cfg.FilterEmptyReplicaSets)
135137
conditionalInformers := getConditionalInformers(clientset, cfg, f, df, metricsClient, log)
136138
additionalTransformers := createAdditionalTransformers(cfg)
137139

@@ -146,7 +148,8 @@ func CollectSingleSnapshot(ctx context.Context,
146148

147149
handledInformers := map[string]*custominformers.HandledInformer{}
148150
for typ, i := range defaultInformers {
149-
handledInformers[typ.String()] = custominformers.NewHandledInformer(log, queue, i.informer, typ, i.filters, additionalTransformers...)
151+
transformers := append(i.transformers, additionalTransformers...)
152+
handledInformers[typ.String()] = custominformers.NewHandledInformer(log, queue, i.informer, typ, i.filters, transformers...)
150153
}
151154
for typ, i := range handledConditionalInformers {
152155
handledInformers[typ] = i
@@ -208,6 +211,7 @@ func New(
208211
agentVersion *config.AgentVersion,
209212
healthzProvider *HealthzProvider,
210213
selfSubjectAccessReview authorizationtypev1.SelfSubjectAccessReviewInterface,
214+
castwareNamespace string,
211215
) *Controller {
212216
healthzProvider.Initializing()
213217

@@ -225,7 +229,7 @@ func New(
225229
df := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, defaultResync, metav1.NamespaceAll, tweakListOptions)
226230
discovery := clientset.Discovery()
227231

228-
defaultInformers := getDefaultInformers(f)
232+
defaultInformers := getDefaultInformers(f, castwareNamespace, cfg.FilterEmptyReplicaSets)
229233
conditionalInformers := getConditionalInformers(clientset, cfg, f, df, metricsClient, log)
230234
additionalTransformers := createAdditionalTransformers(cfg)
231235

@@ -236,7 +240,8 @@ func New(
236240
if !informerEnabled(cfg, name) {
237241
continue
238242
}
239-
handledInformers[name] = custominformers.NewHandledInformer(log, queue, i.informer, typ, i.filters, additionalTransformers...)
243+
transformers := append(i.transformers, additionalTransformers...)
244+
handledInformers[name] = custominformers.NewHandledInformer(log, queue, i.informer, typ, i.filters, transformers...)
240245
}
241246

242247
eventType := reflect.TypeOf(&corev1.Event{})
@@ -448,8 +453,9 @@ func startConditionalInformers(ctx context.Context,
448453
log.Infof("Starting conditional informer for %v", informer.Name())
449454
conditionalInformers[i].isApplied = true
450455

456+
transformers := append(informer.transformers, additionalTransformers...)
451457
name := fmt.Sprintf("%s::%s", informer.groupVersion.String(), informer.kind)
452-
handledInformer := custominformers.NewHandledInformer(log, queue, informer.informerFactory(), informer.apiType, nil, additionalTransformers...)
458+
handledInformer := custominformers.NewHandledInformer(log, queue, informer.informerFactory(), informer.apiType, nil, transformers...)
453459
handledInformers[name] = handledInformer
454460

455461
go handledInformer.Run(ctx.Done())
@@ -985,11 +991,12 @@ func getConditionalInformers(clientset kubernetes.Interface, cfg *config.Control
985991
}
986992

987993
type defaultInformer struct {
988-
informer cache.SharedInformer
989-
filters filters.Filters
994+
informer cache.SharedInformer
995+
filters filters.Filters
996+
transformers transformers.Transformers
990997
}
991998

992-
func getDefaultInformers(f informers.SharedInformerFactory) map[reflect.Type]defaultInformer {
999+
func getDefaultInformers(f informers.SharedInformerFactory, castwareNamespace string, filterEmptyReplicaSets bool) map[reflect.Type]defaultInformer {
9931000
return map[reflect.Type]defaultInformer{
9941001
reflect.TypeOf(&corev1.Node{}): {informer: f.Core().V1().Nodes().Informer()},
9951002
reflect.TypeOf(&corev1.Pod{}): {informer: f.Core().V1().Pods().Informer()},
@@ -998,11 +1005,28 @@ func getDefaultInformers(f informers.SharedInformerFactory) map[reflect.Type]def
9981005
reflect.TypeOf(&corev1.ReplicationController{}): {informer: f.Core().V1().ReplicationControllers().Informer()},
9991006
reflect.TypeOf(&corev1.Namespace{}): {informer: f.Core().V1().Namespaces().Informer()},
10001007
reflect.TypeOf(&appsv1.Deployment{}): {informer: f.Apps().V1().Deployments().Informer()},
1001-
reflect.TypeOf(&appsv1.ReplicaSet{}): {informer: f.Apps().V1().ReplicaSets().Informer()},
1002-
reflect.TypeOf(&appsv1.DaemonSet{}): {informer: f.Apps().V1().DaemonSets().Informer()},
1003-
reflect.TypeOf(&appsv1.StatefulSet{}): {informer: f.Apps().V1().StatefulSets().Informer()},
1004-
reflect.TypeOf(&storagev1.StorageClass{}): {informer: f.Storage().V1().StorageClasses().Informer()},
1005-
reflect.TypeOf(&batchv1.Job{}): {informer: f.Batch().V1().Jobs().Informer()},
1008+
reflect.TypeOf(&appsv1.ReplicaSet{}): {
1009+
informer: f.Apps().V1().ReplicaSets().Informer(),
1010+
transformers: transformers.Transformers{
1011+
func(e castai.EventType, obj interface{}) (castai.EventType, interface{}) {
1012+
replicaSet, ok := obj.(*appsv1.ReplicaSet)
1013+
if !ok || !filterEmptyReplicaSets {
1014+
return e, obj
1015+
}
1016+
1017+
if e == castai.EventDelete || replicaSet.Namespace == castwareNamespace ||
1018+
(replicaSet.Spec.Replicas != nil && *replicaSet.Spec.Replicas > 0 && replicaSet.Status.Replicas > 0) || replicaSet.OwnerReferences == nil {
1019+
return e, obj
1020+
}
1021+
1022+
return castai.EventDelete, obj
1023+
},
1024+
},
1025+
},
1026+
reflect.TypeOf(&appsv1.DaemonSet{}): {informer: f.Apps().V1().DaemonSets().Informer()},
1027+
reflect.TypeOf(&appsv1.StatefulSet{}): {informer: f.Apps().V1().StatefulSets().Informer()},
1028+
reflect.TypeOf(&storagev1.StorageClass{}): {informer: f.Storage().V1().StorageClasses().Informer()},
1029+
reflect.TypeOf(&batchv1.Job{}): {informer: f.Batch().V1().Jobs().Informer()},
10061030
reflect.TypeOf(&corev1.Service{}): {
10071031
informer: f.Core().V1().Services().Informer(),
10081032
filters: filters.Filters{

internal/services/controller/controller_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ func TestController_ShouldReceiveDeltasBasedOnAvailableResources(t *testing.T) {
230230
agentVersion,
231231
NewHealthzProvider(defaultHealthzCfg, log),
232232
fakeAuthorization.SelfSubjectAccessReviews(),
233+
"",
233234
)
234235

235236
if mockDiscovery != nil {
@@ -391,6 +392,7 @@ func TestController_ShouldSendByInterval(t *testing.T) {
391392
agentVersion,
392393
NewHealthzProvider(defaultHealthzCfg, log),
393394
clientset.AuthorizationV1().SelfSubjectAccessReviews(),
395+
"",
394396
)
395397

396398
ctrl.Start(ctx.Done())
@@ -517,6 +519,7 @@ func TestController_ShouldCancelAndRestartAfterFailToSend(t *testing.T) {
517519
agentVersion,
518520
NewHealthzProvider(defaultHealthzCfg, log),
519521
clientset.AuthorizationV1().SelfSubjectAccessReviews(),
522+
"",
520523
)
521524

522525
ctrl.Start(ctx.Done())
@@ -1266,6 +1269,7 @@ func TestCollectSingleSnapshot(t *testing.T) {
12661269
PrepTimeout: 10 * time.Second,
12671270
},
12681271
version,
1272+
"",
12691273
)
12701274
r.NoError(err)
12711275
r.NotNil(snapshot)

internal/services/controller/worker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func Loop(
6060
agentVersion,
6161
healthzProvider,
6262
clientset.AuthorizationV1().SelfSubjectAccessReviews(),
63+
cfg.SelfPod.Namespace,
6364
)
6465

6566
ctrl.Start(ctrlCtx.Done())

0 commit comments

Comments
 (0)