From 12ebdb8d697f688666c9c7b8577888b8b9637a91 Mon Sep 17 00:00:00 2001 From: Keith Mattix II Date: Wed, 8 Jun 2022 23:51:14 -0500 Subject: [PATCH 1/8] (feat/refactor): Centralize Informers Instead of creating different objects (from 5 different packages) that each manage their own informer collections, create a single object that runs a set of informers. A pointer to this object is then shared with all objects that need it Signed-off-by: Keith Mattix II --- cmd/osm-controller/osm-controller.go | 29 ++- cmd/osm-injector/osm-injector.go | 20 +- pkg/k8s/client.go | 108 +++-------- pkg/k8s/client_test.go | 265 +++++++++++++++++++++++---- pkg/k8s/errors.go | 2 - pkg/k8s/informers/informers.go | 161 ++++++++++++++++ pkg/k8s/informers/init.go | 229 +++++++++++++++++++++++ pkg/k8s/informers/types.go | 97 ++++++++++ pkg/k8s/types.go | 12 +- pkg/smi/client_test.go | 14 +- 10 files changed, 786 insertions(+), 151 deletions(-) create mode 100644 pkg/k8s/informers/informers.go create mode 100644 pkg/k8s/informers/init.go create mode 100644 pkg/k8s/informers/types.go diff --git a/cmd/osm-controller/osm-controller.go b/cmd/osm-controller/osm-controller.go index ad1a3d2b4b..59e5b57d25 100644 --- a/cmd/osm-controller/osm-controller.go +++ b/cmd/osm-controller/osm-controller.go @@ -14,6 +14,9 @@ import ( "time" "github.com/pkg/errors" + smiAccessClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/clientset/versioned" + smiTrafficSpecClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/clientset/versioned" + smiTrafficSplitClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned" "github.com/spf13/pflag" admissionv1 "k8s.io/api/admissionregistration/v1" corev1 "k8s.io/api/core/v1" @@ -43,6 +46,7 @@ import ( "github.com/openservicemesh/osm/pkg/ingress" "github.com/openservicemesh/osm/pkg/k8s" "github.com/openservicemesh/osm/pkg/k8s/events" + "github.com/openservicemesh/osm/pkg/k8s/informers" "github.com/openservicemesh/osm/pkg/logger" "github.com/openservicemesh/osm/pkg/messaging" "github.com/openservicemesh/osm/pkg/metricsstore" @@ -151,6 +155,7 @@ func main() { } kubeClient := kubernetes.NewForConfigOrDie(kubeConfig) policyClient := policyClientset.NewForConfigOrDie(kubeConfig) + configClient := configClientset.NewForConfigOrDie(kubeConfig) // Initialize the generic Kubernetes event recorder and associate it with the osm-controller pod resource controllerPod, err := getOSMControllerPod(kubeClient) @@ -176,17 +181,23 @@ func main() { msgBroker := messaging.NewBroker(stop) - // This component will be watching resources in the config.openservicemesh.io API group - cfg, err := configurator.NewConfigurator(configClientset.NewForConfigOrDie(kubeConfig), stop, osmNamespace, osmMeshConfigName, msgBroker) + smiTrafficSplitClientSet := smiTrafficSplitClient.NewForConfigOrDie(kubeConfig) + smiTrafficSpecClientSet := smiTrafficSpecClient.NewForConfigOrDie(kubeConfig) + smiTrafficTargetClientSet := smiAccessClient.NewForConfigOrDie(kubeConfig) + + informerCollection, err := informers.NewInformerCollection(meshName, stop, kubeClient, smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet, configClient, policyClient) if err != nil { - events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io") + events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating informer collection") } - k8sClient, err := k8s.NewKubernetesController(kubeClient, policyClient, meshName, stop, msgBroker) + // This component will be watching resources in the config.openservicemesh.io API group + cfg, err := configurator.NewConfigurator(configClient, stop, osmNamespace, osmMeshConfigName, msgBroker) if err != nil { - events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating Kubernetes Controller") + events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io") } + k8sClient := k8s.NewKubernetesController(informerCollection, policyClient, msgBroker) + meshSpec, err := smi.NewMeshSpecClient(kubeConfig, osmNamespace, k8sClient, stop, msgBroker) if err != nil { events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating MeshSpec") @@ -215,16 +226,16 @@ func main() { } } - var configClient config.Controller + var multiclusterConfigClient config.Controller if cfg.GetFeatureFlags().EnableMulticlusterMode { - if configClient, err = config.NewConfigController(kubeConfig, k8sClient, stop, msgBroker); err != nil { + if multiclusterConfigClient, err = config.NewConfigController(kubeConfig, k8sClient, stop, msgBroker); err != nil { events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating Kubernetes config client") } } - // A nil configClient is passed in if multi cluster mode is not enabled. - kubeProvider := kube.NewClient(k8sClient, configClient, cfg) + // A nil multiclusterConfigClient is passed in if multi cluster mode is not enabled. + kubeProvider := kube.NewClient(k8sClient, multiclusterConfigClient, cfg) endpointsProviders := []endpoint.Provider{kubeProvider} serviceProviders := []service.Provider{kubeProvider} diff --git a/cmd/osm-injector/osm-injector.go b/cmd/osm-injector/osm-injector.go index 3a109080e6..b24d2ef592 100644 --- a/cmd/osm-injector/osm-injector.go +++ b/cmd/osm-injector/osm-injector.go @@ -11,6 +11,9 @@ import ( "time" "github.com/pkg/errors" + smiAccessClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/clientset/versioned" + smiTrafficSpecClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/clientset/versioned" + smiTrafficSplitClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned" "github.com/spf13/pflag" admissionv1 "k8s.io/api/admissionregistration/v1" corev1 "k8s.io/api/core/v1" @@ -32,6 +35,7 @@ import ( "github.com/openservicemesh/osm/pkg/injector" "github.com/openservicemesh/osm/pkg/k8s" "github.com/openservicemesh/osm/pkg/k8s/events" + "github.com/openservicemesh/osm/pkg/k8s/informers" "github.com/openservicemesh/osm/pkg/logger" "github.com/openservicemesh/osm/pkg/messaging" "github.com/openservicemesh/osm/pkg/metricsstore" @@ -139,6 +143,7 @@ func main() { } kubeClient := kubernetes.NewForConfigOrDie(kubeConfig) policyClient := policyClientset.NewForConfigOrDie(kubeConfig) + configClient := configClientset.NewForConfigOrDie(kubeConfig) // Initialize the generic Kubernetes event recorder and associate it with the osm-injector pod resource injectorPod, err := getInjectorPod(kubeClient) @@ -172,6 +177,16 @@ func main() { msgBroker := messaging.NewBroker(stop) + smiTrafficSplitClientSet := smiTrafficSplitClient.NewForConfigOrDie(kubeConfig) + smiTrafficSpecClientSet := smiTrafficSpecClient.NewForConfigOrDie(kubeConfig) + smiTrafficTargetClientSet := smiAccessClient.NewForConfigOrDie(kubeConfig) + + informerCollection, err := informers.NewInformerCollection(meshName, stop, kubeClient, smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet, configClient, policyClient) + + if err != nil { + events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating informer collection") + } + // Initialize Configurator to watch resources in the config.openservicemesh.io API group cfg, err := configurator.NewConfigurator(configClientset.NewForConfigOrDie(kubeConfig), stop, osmNamespace, osmMeshConfigName, msgBroker) if err != nil { @@ -179,10 +194,7 @@ func main() { } // Initialize kubernetes.Controller to watch kubernetes resources - kubeController, err := k8s.NewKubernetesController(kubeClient, policyClient, meshName, stop, msgBroker, k8s.Namespaces) - if err != nil { - events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating Kubernetes Controller") - } + kubeController := k8s.NewKubernetesController(informerCollection, policyClient, msgBroker, k8s.Namespaces) certOpts, err := getCertOptions() if err != nil { diff --git a/pkg/k8s/client.go b/pkg/k8s/client.go index 424aa09395..ea67a7970a 100644 --- a/pkg/k8s/client.go +++ b/pkg/k8s/client.go @@ -11,40 +11,33 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "k8s.io/utils/pointer" + "github.com/openservicemesh/osm/pkg/announcements" policyv1alpha1 "github.com/openservicemesh/osm/pkg/apis/policy/v1alpha1" "github.com/openservicemesh/osm/pkg/envoy" "github.com/openservicemesh/osm/pkg/errcode" policyv1alpha1Client "github.com/openservicemesh/osm/pkg/gen/client/policy/clientset/versioned" "github.com/openservicemesh/osm/pkg/messaging" - "github.com/openservicemesh/osm/pkg/announcements" "github.com/openservicemesh/osm/pkg/constants" "github.com/openservicemesh/osm/pkg/identity" + osminformers "github.com/openservicemesh/osm/pkg/k8s/informers" "github.com/openservicemesh/osm/pkg/service" ) // NewKubernetesController returns a new kubernetes.Controller which means to provide access to locally-cached k8s resources -func NewKubernetesController(kubeClient kubernetes.Interface, policyClient policyv1alpha1Client.Interface, meshName string, - stop <-chan struct{}, msgBroker *messaging.Broker, selectInformers ...InformerKey) (Controller, error) { - return newClient(kubeClient, policyClient, meshName, stop, msgBroker, selectInformers...) +func NewKubernetesController(informerCollection *osminformers.InformerCollection, policyClient policyv1alpha1Client.Interface, msgBroker *messaging.Broker, selectInformers ...InformerKey) Controller { + return newClient(informerCollection, policyClient, msgBroker, selectInformers...) } -func newClient(kubeClient kubernetes.Interface, policyClient policyv1alpha1Client.Interface, meshName string, - stop <-chan struct{}, msgBroker *messaging.Broker, selectInformers ...InformerKey) (*client, error) { +func newClient(informerCollection *osminformers.InformerCollection, policyClient policyv1alpha1Client.Interface, msgBroker *messaging.Broker, selectInformers ...InformerKey) *client { // Initialize client object c := &client{ - kubeClient: kubeClient, - policyClient: policyClient, - meshName: meshName, - informers: informerCollection{}, + informers: informerCollection, msgBroker: msgBroker, + policyClient: policyClient, } // Initialize informers @@ -65,35 +58,18 @@ func newClient(kubeClient kubernetes.Interface, policyClient policyv1alpha1Clien informerInitHandlerMap[informer]() } - if err := c.run(stop); err != nil { - log.Error().Err(err).Msg("Could not start Kubernetes Namespaces client") - return nil, err - } - - return c, nil + return c } // Initializes Namespace monitoring func (c *client) initNamespaceMonitor() { - monitorNamespaceLabel := map[string]string{constants.OSMKubeResourceMonitorAnnotation: c.meshName} - - labelSelector := fields.SelectorFromSet(monitorNamespaceLabel).String() - option := informers.WithTweakListOptions(func(opt *metav1.ListOptions) { - opt.LabelSelector = labelSelector - }) - - informerFactory := informers.NewSharedInformerFactoryWithOptions(c.kubeClient, DefaultKubeEventResyncInterval, option) - - // Add informer - c.informers[Namespaces] = informerFactory.Core().V1().Namespaces().Informer() - // Add event handler to informer nsEventTypes := EventTypes{ Add: announcements.NamespaceAdded, Update: announcements.NamespaceUpdated, Delete: announcements.NamespaceDeleted, } - c.informers[Namespaces].AddEventHandler(GetEventHandlerFuncs(nil, nsEventTypes, c.msgBroker)) + c.informers.AddEventHandler(osminformers.InformerKeyNamespace, GetEventHandlerFuncs(nil, nsEventTypes, c.msgBroker)) } // Function to filter K8s meta Objects by OSM's isMonitoredNamespace @@ -107,94 +83,52 @@ func (c *client) shouldObserve(obj interface{}) bool { // Initializes Service monitoring func (c *client) initServicesMonitor() { - informerFactory := informers.NewSharedInformerFactory(c.kubeClient, DefaultKubeEventResyncInterval) - c.informers[Services] = informerFactory.Core().V1().Services().Informer() - svcEventTypes := EventTypes{ Add: announcements.ServiceAdded, Update: announcements.ServiceUpdated, Delete: announcements.ServiceDeleted, } - c.informers[Services].AddEventHandler(GetEventHandlerFuncs(c.shouldObserve, svcEventTypes, c.msgBroker)) + c.informers.AddEventHandler(osminformers.InformerKeyService, GetEventHandlerFuncs(c.shouldObserve, svcEventTypes, c.msgBroker)) } // Initializes Service Account monitoring func (c *client) initServiceAccountsMonitor() { - informerFactory := informers.NewSharedInformerFactory(c.kubeClient, DefaultKubeEventResyncInterval) - c.informers[ServiceAccounts] = informerFactory.Core().V1().ServiceAccounts().Informer() - svcEventTypes := EventTypes{ Add: announcements.ServiceAccountAdded, Update: announcements.ServiceAccountUpdated, Delete: announcements.ServiceAccountDeleted, } - c.informers[ServiceAccounts].AddEventHandler(GetEventHandlerFuncs(c.shouldObserve, svcEventTypes, c.msgBroker)) + c.informers.AddEventHandler(osminformers.InformerKeyServiceAccount, GetEventHandlerFuncs(c.shouldObserve, svcEventTypes, c.msgBroker)) } func (c *client) initPodMonitor() { - informerFactory := informers.NewSharedInformerFactory(c.kubeClient, DefaultKubeEventResyncInterval) - c.informers[Pods] = informerFactory.Core().V1().Pods().Informer() - podEventTypes := EventTypes{ Add: announcements.PodAdded, Update: announcements.PodUpdated, Delete: announcements.PodDeleted, } - c.informers[Pods].AddEventHandler(GetEventHandlerFuncs(c.shouldObserve, podEventTypes, c.msgBroker)) + c.informers.AddEventHandler(osminformers.InformerKeyPod, GetEventHandlerFuncs(c.shouldObserve, podEventTypes, c.msgBroker)) } func (c *client) initEndpointMonitor() { - informerFactory := informers.NewSharedInformerFactory(c.kubeClient, DefaultKubeEventResyncInterval) - c.informers[Endpoints] = informerFactory.Core().V1().Endpoints().Informer() - eptEventTypes := EventTypes{ Add: announcements.EndpointAdded, Update: announcements.EndpointUpdated, Delete: announcements.EndpointDeleted, } - c.informers[Endpoints].AddEventHandler(GetEventHandlerFuncs(c.shouldObserve, eptEventTypes, c.msgBroker)) -} - -func (c *client) run(stop <-chan struct{}) error { - log.Info().Msg("Namespace controller client started") - var hasSynced []cache.InformerSynced - var names []string - - if c.informers == nil { - return errInitInformers - } - - for name, informer := range c.informers { - if informer == nil { - continue - } - - go informer.Run(stop) - names = append(names, (string)(name)) - log.Info().Msgf("Waiting for %s informer cache sync...", name) - hasSynced = append(hasSynced, informer.HasSynced) - } - - if !cache.WaitForCacheSync(stop, hasSynced...) { - return errSyncingCaches - } - - log.Info().Msgf("Caches for %v synced successfully", names) - - return nil + c.informers.AddEventHandler(osminformers.InformerKeyEndpoints, GetEventHandlerFuncs(c.shouldObserve, eptEventTypes, c.msgBroker)) } // IsMonitoredNamespace returns a boolean indicating if the namespace is among the list of monitored namespaces func (c client) IsMonitoredNamespace(namespace string) bool { - _, exists, _ := c.informers[Namespaces].GetStore().GetByKey(namespace) - return exists + return c.informers.IsMonitoredNamespace(namespace) } // ListMonitoredNamespaces returns all namespaces that the mesh is monitoring. func (c client) ListMonitoredNamespaces() ([]string, error) { var namespaces []string - for _, ns := range c.informers[Namespaces].GetStore().List() { + for _, ns := range c.informers.List(osminformers.InformerKeyNamespace) { namespace, ok := ns.(*corev1.Namespace) if !ok { log.Error().Err(errListingNamespaces).Msg("Failed to list monitored namespaces") @@ -208,7 +142,7 @@ func (c client) ListMonitoredNamespaces() ([]string, error) { // GetService retrieves the Kubernetes Services resource for the given MeshService func (c client) GetService(svc service.MeshService) *corev1.Service { // client-go cache uses / as key - svcIf, exists, err := c.informers[Services].GetStore().GetByKey(svc.NamespacedKey()) + svcIf, exists, err := c.informers.GetByKey(osminformers.InformerKeyService, svc.NamespacedKey()) if exists && err == nil { svc := svcIf.(*corev1.Service) return svc @@ -220,7 +154,7 @@ func (c client) GetService(svc service.MeshService) *corev1.Service { func (c client) ListServices() []*corev1.Service { var services []*corev1.Service - for _, serviceInterface := range c.informers[Services].GetStore().List() { + for _, serviceInterface := range c.informers.List(osminformers.InformerKeyService) { svc := serviceInterface.(*corev1.Service) if !c.IsMonitoredNamespace(svc.Namespace) { @@ -235,7 +169,7 @@ func (c client) ListServices() []*corev1.Service { func (c client) ListServiceAccounts() []*corev1.ServiceAccount { var serviceAccounts []*corev1.ServiceAccount - for _, serviceInterface := range c.informers[ServiceAccounts].GetStore().List() { + for _, serviceInterface := range c.informers.List(osminformers.InformerKeyServiceAccount) { sa := serviceInterface.(*corev1.ServiceAccount) if !c.IsMonitoredNamespace(sa.Namespace) { @@ -248,7 +182,7 @@ func (c client) ListServiceAccounts() []*corev1.ServiceAccount { // GetNamespace returns a Namespace resource if found, nil otherwise. func (c client) GetNamespace(ns string) *corev1.Namespace { - nsIf, exists, err := c.informers[Namespaces].GetStore().GetByKey(ns) + nsIf, exists, err := c.informers.GetByKey(osminformers.InformerKeyNamespace, ns) if exists && err == nil { ns := nsIf.(*corev1.Namespace) return ns @@ -262,7 +196,7 @@ func (c client) GetNamespace(ns string) *corev1.Namespace { func (c client) ListPods() []*corev1.Pod { var pods []*corev1.Pod - for _, podInterface := range c.informers[Pods].GetStore().List() { + for _, podInterface := range c.informers.List(osminformers.InformerKeyPod) { pod := podInterface.(*corev1.Pod) if !c.IsMonitoredNamespace(pod.Namespace) { continue @@ -275,7 +209,7 @@ func (c client) ListPods() []*corev1.Pod { // GetEndpoints returns the endpoint for a given service, otherwise returns nil if not found // or error if the API errored out. func (c client) GetEndpoints(svc service.MeshService) (*corev1.Endpoints, error) { - ep, exists, err := c.informers[Endpoints].GetStore().GetByKey(svc.NamespacedKey()) + ep, exists, err := c.informers.GetByKey(osminformers.InformerKeyEndpoints, svc.NamespacedKey()) if err != nil { return nil, err } diff --git a/pkg/k8s/client_test.go b/pkg/k8s/client_test.go index 6fb81b49ee..ee527fef4d 100644 --- a/pkg/k8s/client_test.go +++ b/pkg/k8s/client_test.go @@ -1,11 +1,11 @@ package k8s import ( + "fmt" "testing" "github.com/golang/mock/gomock" "github.com/google/uuid" - "github.com/stretchr/testify/assert" tassert "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" @@ -13,6 +13,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" testclient "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" "k8s.io/utils/pointer" policyv1alpha1 "github.com/openservicemesh/osm/pkg/apis/policy/v1alpha1" @@ -23,13 +24,59 @@ import ( "github.com/openservicemesh/osm/pkg/constants" "github.com/openservicemesh/osm/pkg/identity" + "github.com/openservicemesh/osm/pkg/k8s/informers" "github.com/openservicemesh/osm/pkg/service" ) var ( - testMeshName = "mesh" + testMeshName = "mesh" + k8sInformerKeys = []informers.InformerKey{ + informers.InformerKeyNamespace, + informers.InformerKeyService, + informers.InformerKeyServiceAccount, + informers.InformerKeyPod, + informers.InformerKeyEndpoints, + } ) +func newFakeCacheStore(store map[string]interface{}, keyFunc func(obj interface{}) string) cache.Store { + return &cache.FakeCustomStore{ + AddFunc: func(obj interface{}) error { store[keyFunc(obj)] = obj; return nil }, + UpdateFunc: func(obj interface{}) error { store[keyFunc(obj)] = obj; return nil }, + DeleteFunc: func(obj interface{}) error { delete(store, keyFunc(obj)); return nil }, + ListFunc: func() []interface{} { + var objs []interface{} + for _, obj := range store { + objs = append(objs, obj) + } + return objs + }, + ListKeysFunc: func() []string { + var keys []string + for key := range store { + keys = append(keys, key) + } + return keys + }, + GetFunc: func(obj interface{}) (item interface{}, exists bool, err error) { + item, ok := store[keyFunc(obj)] + if !ok { + return nil, false, nil + } + return item, true, nil + }, + GetByKeyFunc: func(key string) (item interface{}, exists bool, err error) { + item, ok := store[key] + if !ok { + return nil, false, nil + } + return item, true, nil + }, + ResyncFunc: func() error { return nil }, + ReplaceFunc: func(list []interface{}, resourceVersion string) error { return nil }, + } +} + func TestIsMonitoredNamespace(t *testing.T) { testCases := []struct { name string @@ -59,12 +106,23 @@ func TestIsMonitoredNamespace(t *testing.T) { }, } + namespaceKeyFunc := func(obj interface{}) string { + ns := obj.(*corev1.Namespace) + return ns.GetName() + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + namespaceStore := map[string]interface{}{} + store := newFakeCacheStore(namespaceStore, namespaceKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyNamespace: store, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) - _ = c.informers[Namespaces].GetStore().Add(tc.namespace) + c := newClient(ic, nil, nil) + store.Add(tc.namespace) actual := c.IsMonitoredNamespace(tc.ns) a.Equal(tc.expected, actual) @@ -101,12 +159,23 @@ func TestGetNamespace(t *testing.T) { }, } + namespaceKeyFunc := func(obj interface{}) string { + ns := obj.(*corev1.Namespace) + return ns.GetName() + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + namespaceStore := map[string]interface{}{} + store := newFakeCacheStore(namespaceStore, namespaceKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyNamespace: store, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) - _ = c.informers[Namespaces].GetStore().Add(tc.namespace) + c := newClient(ic, nil, nil) + store.Add(tc.namespace) actual := c.GetNamespace(tc.ns) if tc.expected { @@ -147,13 +216,24 @@ func TestListMonitoredNamespaces(t *testing.T) { }, } + namespaceKeyFunc := func(obj interface{}) string { + ns := obj.(*corev1.Namespace) + return ns.GetName() + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + namespaceStore := map[string]interface{}{} + store := newFakeCacheStore(namespaceStore, namespaceKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyNamespace: store, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) + c := newClient(ic, nil, nil) for _, ns := range tc.namespaces { - _ = c.informers[Namespaces].GetStore().Add(ns) + store.Add(ns) } actual, err := c.ListMonitoredNamespaces() @@ -205,12 +285,23 @@ func TestGetService(t *testing.T) { }, } + serviceKeyFunc := func(obj interface{}) string { + svc := obj.(*corev1.Service) + return fmt.Sprintf("%s/%s", svc.GetNamespace(), svc.GetName()) + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + serviceStore := map[string]interface{}{} + store := newFakeCacheStore(serviceStore, serviceKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyService: store, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) - _ = c.informers[Services].GetStore().Add(tc.service) + c := newClient(ic, nil, nil) + store.Add(tc.service) actual := c.GetService(tc.svc) if tc.expected { @@ -261,15 +352,34 @@ func TestListServices(t *testing.T) { }, } + serviceKeyFunc := func(obj interface{}) string { + svc := obj.(*corev1.Service) + return fmt.Sprintf("%s/%s", svc.GetNamespace(), svc.GetName()) + } + + namespaceKeyFunc := func(obj interface{}) string { + ns := obj.(*corev1.Namespace) + return ns.GetName() + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + serviceStore := map[string]interface{}{} + namespaceStore := map[string]interface{}{} + nsStore := newFakeCacheStore(namespaceStore, namespaceKeyFunc) + svcStore := newFakeCacheStore(serviceStore, serviceKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyNamespace: nsStore, + informers.InformerKeyService: svcStore, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) - _ = c.informers[Namespaces].GetStore().Add(tc.namespace) + c := newClient(ic, nil, nil) + nsStore.Add(tc.namespace) for _, s := range tc.services { - _ = c.informers[Services].GetStore().Add(s) + svcStore.Add(s) } actual := c.ListServices() @@ -317,15 +427,34 @@ func TestListServiceAccounts(t *testing.T) { }, } + serviceAccountKeyFunc := func(obj interface{}) string { + svcAccount := obj.(*corev1.ServiceAccount) + return fmt.Sprintf("%s/%s", svcAccount.GetNamespace(), svcAccount.GetName()) + } + + namespaceKeyFunc := func(obj interface{}) string { + ns := obj.(*corev1.Namespace) + return ns.GetName() + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + serviceStore := map[string]interface{}{} + namespaceStore := map[string]interface{}{} + nsStore := newFakeCacheStore(namespaceStore, namespaceKeyFunc) + svcAccountStore := newFakeCacheStore(serviceStore, serviceAccountKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyNamespace: nsStore, + informers.InformerKeyServiceAccount: svcAccountStore, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) - _ = c.informers[Namespaces].GetStore().Add(tc.namespace) + c := newClient(ic, nil, nil) + nsStore.Add(tc.namespace) for _, s := range tc.sa { - _ = c.informers[ServiceAccounts].GetStore().Add(s) + svcAccountStore.Add(s) } actual := c.ListServiceAccounts() @@ -373,15 +502,34 @@ func TestListPods(t *testing.T) { }, } + podKeyFunc := func(obj interface{}) string { + pod := obj.(*corev1.Pod) + return fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName()) + } + + namespaceKeyFunc := func(obj interface{}) string { + ns := obj.(*corev1.Namespace) + return ns.GetName() + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + podStore := map[string]interface{}{} + namespaceStore := map[string]interface{}{} + nsStore := newFakeCacheStore(namespaceStore, namespaceKeyFunc) + pStore := newFakeCacheStore(podStore, podKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyNamespace: nsStore, + informers.InformerKeyPod: pStore, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) - _ = c.informers[Namespaces].GetStore().Add(tc.namespace) + c := newClient(ic, nil, nil) + nsStore.Add(tc.namespace) - for _, s := range tc.pods { - _ = c.informers[Pods].GetStore().Add(s) + for _, p := range tc.pods { + pStore.Add(p) } actual := c.ListPods() @@ -426,12 +574,23 @@ func TestGetEndpoints(t *testing.T) { }, } + endpointsKeyFunc := func(obj interface{}) string { + epts := obj.(*corev1.Endpoints) + return fmt.Sprintf("%s/%s", epts.GetNamespace(), epts.GetName()) + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + endpointsStore := map[string]interface{}{} + eptsStore := newFakeCacheStore(endpointsStore, endpointsKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyEndpoints: eptsStore, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) - _ = c.informers[Endpoints].GetStore().Add(tc.endpoints) + c := newClient(ic, nil, nil) + eptsStore.Add(tc.endpoints) actual, err := c.GetEndpoints(tc.svc) a.Nil(err) @@ -524,16 +683,43 @@ func TestListServiceIdentitiesForService(t *testing.T) { }, } + namespaceKeyFunc := func(obj interface{}) string { + ns := obj.(*corev1.Namespace) + return ns.GetName() + } + + podKeyFunc := func(obj interface{}) string { + pod := obj.(*corev1.Pod) + return fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName()) + } + + svcKeyFunc := func(obj interface{}) string { + svc := obj.(*corev1.Service) + return fmt.Sprintf("%s/%s", svc.GetNamespace(), svc.GetName()) + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + podStore := map[string]interface{}{} + namespaceStore := map[string]interface{}{} + serviceStore := map[string]interface{}{} + nsStore := newFakeCacheStore(namespaceStore, namespaceKeyFunc) + pStore := newFakeCacheStore(podStore, podKeyFunc) + svcStore := newFakeCacheStore(serviceStore, svcKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyNamespace: nsStore, + informers.InformerKeyPod: pStore, + informers.InformerKeyService: svcStore, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) - _ = c.informers[Namespaces].GetStore().Add(tc.namespace) + c := newClient(ic, nil, nil) + nsStore.Add(tc.namespace) for _, p := range tc.pods { - _ = c.informers[Pods].GetStore().Add(p) + pStore.Add(p) } - _ = c.informers[Services].GetStore().Add(tc.service) + svcStore.Add(tc.service) actual, err := c.ListServiceIdentitiesForService(tc.svc) a.Equal(tc.expectErr, err != nil) @@ -700,10 +886,9 @@ func TestUpdateStatus(t *testing.T) { a := tassert.New(t) kubeClient := testclient.NewSimpleClientset() policyClient := fakePolicyClient.NewSimpleClientset(tc.existingResource.(runtime.Object)) - - c, err := NewKubernetesController(kubeClient, policyClient, testMeshName, make(chan struct{}), nil) + ic, err := informers.NewInformerCollection(testMeshName, nil, kubeClient, nil, nil, nil, nil, policyClient, informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) - + c := NewKubernetesController(ic, policyClient, nil) _, err = c.UpdateStatus(tc.updatedResource) a.Equal(tc.expectErr, err != nil) }) @@ -1020,8 +1205,10 @@ func TestK8sServicesToMeshServices(t *testing.T) { fakeClient := testclient.NewSimpleClientset(tc.svcEndpoints...) stop := make(chan struct{}) - kubeController, err := NewKubernetesController(fakeClient, nil, testMeshName, stop, nil) + ic, err := informers.NewInformerCollection(testMeshName, stop, fakeClient, nil, nil, nil, nil, nil, informers.WithSelectedInformers(k8sInformerKeys...)) assert.Nil(err) + + kubeController := NewKubernetesController(ic, nil, nil) assert.NotNil(kubeController) actual := ServiceToMeshServices(kubeController, tc.svc) diff --git a/pkg/k8s/errors.go b/pkg/k8s/errors.go index 3c0b520c83..b170b32e90 100644 --- a/pkg/k8s/errors.go +++ b/pkg/k8s/errors.go @@ -3,8 +3,6 @@ package k8s import "github.com/pkg/errors" var ( - errSyncingCaches = errors.New("Failed initial cache sync for Namespace informers") - errInitInformers = errors.New("Informer not initialized") errListingNamespaces = errors.New("Failed to list monitored namespaces") errServiceNotFound = errors.New("Service not found") diff --git a/pkg/k8s/informers/informers.go b/pkg/k8s/informers/informers.go new file mode 100644 index 0000000000..de3e409094 --- /dev/null +++ b/pkg/k8s/informers/informers.go @@ -0,0 +1,161 @@ +package informers + +import ( + "github.com/rs/zerolog/log" + smiTrafficAccessClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/clientset/versioned" + smiTrafficSpecClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/clientset/versioned" + smiTrafficSplitClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + configClientset "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned" + policyClientset "github.com/openservicemesh/osm/pkg/gen/client/policy/clientset/versioned" +) + +type InformerCollectionOpt func(*InformerCollection) + +func NewInformerCollection(meshName string, stop <-chan struct{}, kubeClient kubernetes.Interface, smiTrafficSplitClient smiTrafficSplitClient.Interface, + smiTrafficSpecClient smiTrafficSpecClient.Interface, smiAccessClient smiTrafficAccessClient.Interface, configClient configClientset.Interface, + policyClient policyClientset.Interface, opts ...InformerCollectionOpt) (*InformerCollection, error) { + + ic := &InformerCollection{ + meshName: meshName, + kubeClient: kubeClient, + smiTrafficSplitClient: smiTrafficSplitClient, + smiTrafficSpecClient: smiTrafficSpecClient, + smiAccessClient: smiAccessClient, + configClient: configClient, + policyClient: policyClient, + informers: make(informerCollection), + } + + for _, opt := range opts { + opt(ic) + } + + // Initialize informers + informerInitHandlerMap := map[InformerKey]informerInit{ + // Kubernetes + InformerKeyNamespace: ic.initNamespaceMonitor, + InformerKeyService: ic.initServicesMonitor, + InformerKeyServiceAccount: ic.initServiceAccountsMonitor, + InformerKeyPod: ic.initPodMonitor, + InformerKeyEndpoints: ic.initEndpointMonitor, + + // SMI + InformerKeyTrafficSplit: ic.initTrafficSplitMonitor, + InformerKeyTrafficTarget: ic.initTrafficTargetMonitor, + InformerKeyHTTPRouteGroup: ic.initHTTPRouteGroupMonitor, + InformerKeyTCPRoute: ic.initTCPRouteMonitor, + + // Config + InformerKeyMeshConfig: ic.initMeshConfigMonitor, + InformerKeyMeshRootCertificate: ic.initMeshRootCertificateMonitor, + + // Policy + InformerKeyEgress: ic.initEgressMonitor, + InformerKeyIngressBackend: ic.initIngressBackendMonitor, + InformerKeyUpstreamTrafficSetting: ic.initUpstreamTrafficSettingMonitor, + InformerKeyRetry: ic.initRetryMonitor, + } + + if len(ic.selectedInformers) == 0 { + // Initialize all informers + ic.selectedInformers = []InformerKey{ + InformerKeyNamespace, + InformerKeyService, + InformerKeyPod, + InformerKeyEndpoints, + InformerKeyServiceAccount, + InformerKeyTrafficSplit, + InformerKeyTrafficTarget, + InformerKeyHTTPRouteGroup, + InformerKeyTCPRoute, + InformerKeyMeshConfig, + InformerKeyMeshRootCertificate, + InformerKeyEgress, + InformerKeyIngressBackend, + InformerKeyUpstreamTrafficSetting, + InformerKeyRetry, + } + } + + for _, key := range ic.selectedInformers { + informerInitHandlerMap[key]() + } + + if err := ic.run(stop); err != nil { + log.Error().Err(err).Msg("Could not start informer collection") + return nil, err + } + + return ic, nil +} + +// WithCustomStore sets the shared store for the informerCollection to reference. +// This store will be used instead of each informer's store. This should typically +// only be used in tests +func WithCustomStores(stores map[InformerKey]cache.Store) InformerCollectionOpt { + return func(ic *InformerCollection) { + ic.customStores = stores + } +} + +func WithSelectedInformers(selectedInformers ...InformerKey) InformerCollectionOpt { + return func(ic *InformerCollection) { + ic.selectedInformers = selectedInformers + } +} + +func (ic *InformerCollection) run(stop <-chan struct{}) error { + log.Info().Msg("Namespace controller client started") + var hasSynced []cache.InformerSynced + var names []string + + if ic.informers == nil { + return errInitInformers + } + + for name, informer := range ic.informers { + if informer == nil { + continue + } + + go informer.Run(make(chan struct{})) + names = append(names, string(name)) + log.Info().Msgf("Waiting for %s informer cache sync...", name) + hasSynced = append(hasSynced, informer.HasSynced) + } + + if !cache.WaitForCacheSync(stop, hasSynced...) { + return errSyncingCaches + } + + log.Info().Msgf("Caches for %v synced successfully", names) + + return nil +} + +func (ic *InformerCollection) AddEventHandler(informerKey InformerKey, handler cache.ResourceEventHandler) { + ic.informers[informerKey].informer.AddEventHandler(handler) +} + +func (ic *InformerCollection) GetByKey(informerKey InformerKey, objectKey string) (interface{}, bool, error) { + informer, ok := ic.informers[informerKey] + if !ok { + // keithmattix: This is the silent failure option, but perhaps we want to return an error? + return nil, false, nil + } + + return informer.GetStore().GetByKey(objectKey) +} + +func (ic *InformerCollection) List(informerKey InformerKey) []interface{} { + informer, ok := ic.informers[informerKey] + if !ok { + // keithmattix: This is the silent failure option, but perhaps we want to return an error? + return nil + } + + return informer.GetStore().List() +} diff --git a/pkg/k8s/informers/init.go b/pkg/k8s/informers/init.go new file mode 100644 index 0000000000..a96710d041 --- /dev/null +++ b/pkg/k8s/informers/init.go @@ -0,0 +1,229 @@ +package informers + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/informers" + + configInformers "github.com/openservicemesh/osm/pkg/gen/client/config/informers/externalversions" + policyInformers "github.com/openservicemesh/osm/pkg/gen/client/policy/informers/externalversions" + smiAccessInformers "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/informers/externalversions" + smiTrafficSpecInformers "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/informers/externalversions" + smiTrafficSplitInformers "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/informers/externalversions" + + "github.com/openservicemesh/osm/pkg/constants" +) + +// IsMonitoredNamespace returns a boolean indicating if the namespace is among the list of monitored namespaces +func (ic InformerCollection) IsMonitoredNamespace(namespace string) bool { + _, exists, _ := ic.informers[InformerKeyNamespace].GetStore().GetByKey(namespace) + return exists +} + +// Initializes Namespace monitoring +func (ic *InformerCollection) initNamespaceMonitor() { + monitorNamespaceLabel := map[string]string{constants.OSMKubeResourceMonitorAnnotation: ic.meshName} + + labelSelector := fields.SelectorFromSet(monitorNamespaceLabel).String() + option := informers.WithTweakListOptions(func(opt *metav1.ListOptions) { + opt.LabelSelector = labelSelector + }) + + informerFactory := informers.NewSharedInformerFactoryWithOptions(ic.kubeClient, DefaultKubeEventResyncInterval, option) + + // Add informer + informer := &informer{ + informer: informerFactory.Core().V1().Namespaces().Informer(), + } + + customStore := ic.customStores[InformerKeyNamespace] + if customStore != nil { + informer.customStore = customStore + } + + ic.informers[InformerKeyNamespace] = informer +} + +// Initializes Service monitoring +func (ic *InformerCollection) initServicesMonitor() { + informerFactory := informers.NewSharedInformerFactory(ic.kubeClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Core().V1().Services().Informer(), + } + + customStore := ic.customStores[InformerKeyService] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyService] = informer +} + +// Initializes Service Account monitoring +func (ic *InformerCollection) initServiceAccountsMonitor() { + informerFactory := informers.NewSharedInformerFactory(ic.kubeClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Core().V1().ServiceAccounts().Informer(), + } + + customStore := ic.customStores[InformerKeyServiceAccount] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyServiceAccount] = informer +} + +func (ic *InformerCollection) initPodMonitor() { + informerFactory := informers.NewSharedInformerFactory(ic.kubeClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Core().V1().Pods().Informer(), + } + + customStore := ic.customStores[InformerKeyPod] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyPod] = informer +} + +func (ic *InformerCollection) initEndpointMonitor() { + informerFactory := informers.NewSharedInformerFactory(ic.kubeClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Core().V1().Endpoints().Informer(), + } + + customStore := ic.customStores[InformerKeyEndpoints] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyEndpoints] = informer +} + +func (ic *InformerCollection) initTrafficSplitMonitor() { + informerFactory := smiTrafficSplitInformers.NewSharedInformerFactory(ic.smiTrafficSplitClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Split().V1alpha3().TrafficSplits().Informer(), + } + + customStore := ic.customStores[InformerKeyTrafficSplit] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyTrafficSplit] = informer +} + +func (ic *InformerCollection) initTrafficTargetMonitor() { + informerFactory := smiAccessInformers.NewSharedInformerFactory(ic.smiAccessClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Access().V1alpha3().TrafficTargets().Informer(), + } + + customStore := ic.customStores[InformerKeyTrafficTarget] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyTrafficTarget] = informer +} + +func (ic *InformerCollection) initHTTPRouteGroupMonitor() { + informerFactory := smiTrafficSpecInformers.NewSharedInformerFactory(ic.smiTrafficSpecClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Specs().V1alpha2().HTTPRouteGroups().Informer(), + } + + customStore := ic.customStores[InformerKeyHTTPRouteGroup] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyHTTPRouteGroup] = informer +} + +func (ic *InformerCollection) initTCPRouteMonitor() { + informerFactory := smiTrafficSpecInformers.NewSharedInformerFactory(ic.smiTrafficSpecClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Specs().V1alpha1().TCPRoutes().Informer(), + } + + customStore := ic.customStores[InformerKeyTCPRoute] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyTCPRoute] = informer +} + +func (ic *InformerCollection) initMeshConfigMonitor() { + informerFactory := configInformers.NewSharedInformerFactory(ic.configClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Config().V1alpha2().MeshConfigs().Informer(), + } + + customStore := ic.customStores[InformerKeyMeshConfig] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyMeshConfig] = informer +} + +func (ic *InformerCollection) initMeshRootCertificateMonitor() { + informerFactory := configInformers.NewSharedInformerFactory(ic.configClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Config().V1alpha2().MeshRootCertificates().Informer(), + } + + customStore := ic.customStores[InformerKeyMeshRootCertificate] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyMeshRootCertificate] = informer +} + +func (ic *InformerCollection) initEgressMonitor() { + informerFactory := policyInformers.NewSharedInformerFactory(ic.policyClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Policy().V1alpha1().Egresses().Informer(), + } + + customStore := ic.customStores[InformerKeyEgress] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyEgress] = informer +} + +func (ic *InformerCollection) initIngressBackendMonitor() { + informerFactory := policyInformers.NewSharedInformerFactory(ic.policyClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Policy().V1alpha1().IngressBackends().Informer(), + } + + customStore := ic.customStores[InformerKeyIngressBackend] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyIngressBackend] = informer +} + +func (ic *InformerCollection) initUpstreamTrafficSettingMonitor() { + informerFactory := policyInformers.NewSharedInformerFactory(ic.policyClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Policy().V1alpha1().UpstreamTrafficSettings().Informer(), + } + + customStore := ic.customStores[InformerKeyUpstreamTrafficSetting] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyUpstreamTrafficSetting] = informer +} + +func (ic *InformerCollection) initRetryMonitor() { + informerFactory := policyInformers.NewSharedInformerFactory(ic.policyClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Policy().V1alpha1().Retries().Informer(), + } + + customStore := ic.customStores[InformerKeyRetry] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyRetry] = informer +} diff --git a/pkg/k8s/informers/types.go b/pkg/k8s/informers/types.go new file mode 100644 index 0000000000..689e304efd --- /dev/null +++ b/pkg/k8s/informers/types.go @@ -0,0 +1,97 @@ +package informers + +import ( + "errors" + "time" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + smiTrafficAccessClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/clientset/versioned" + smiTrafficSpecClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/clientset/versioned" + smiTrafficSplitClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned" + + configClientset "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned" + policyClientset "github.com/openservicemesh/osm/pkg/gen/client/policy/clientset/versioned" +) + +// InformerKey stores the different Informers we keep for K8s resources +type InformerKey string + +const ( + // Kubernetes + InformerKeyNamespace InformerKey = "Namespace" + InformerKeyService InformerKey = "Service" + InformerKeyPod InformerKey = "Pod" + InformerKeyEndpoints InformerKey = "Endpoints" + InformerKeyServiceAccount InformerKey = "ServiceAccount" + + // SMI + InformerKeyTrafficSplit InformerKey = "TrafficSplit" + InformerKeyTrafficTarget InformerKey = "TrafficTarget" + InformerKeyHTTPRouteGroup InformerKey = "HTTPRouteGroup" + InformerKeyTCPRoute InformerKey = "TCPRoute" + + // Config + InformerKeyMeshConfig InformerKey = "MeshConfig" + InformerKeyMeshRootCertificate InformerKey = "MeshRootCertificate" + + // Policy + InformerKeyEgress InformerKey = "Egress" + InformerKeyIngressBackend InformerKey = "IngressBackend" + InformerKeyUpstreamTrafficSetting InformerKey = "UpstreamTrafficSetting" + InformerKeyRetry InformerKey = "Retry" +) + +const ( + // DefaultKubeEventResyncInterval is the default resync interval for k8s events + // This is set to 0 because we do not need resyncs from k8s client, and have our + // own Ticker to turn on periodic resyncs. + DefaultKubeEventResyncInterval = 0 * time.Second +) + +var ( + errInitInformers = errors.New("informer not initialized") + errSyncingCaches = errors.New("failed initial cache sync for informers") + errReadFromNilInformer = errors.New("failed to read from a nil informer") +) + +type informer struct { + customStore cache.Store + informer cache.SharedIndexInformer +} + +type informerCollection map[InformerKey]*informer + +type getStoreFunc func() cache.Store + +type InformerCollection struct { + informers informerCollection + meshName string + kubeClient kubernetes.Interface + smiTrafficSplitClient smiTrafficSplitClient.Interface + smiTrafficSpecClient smiTrafficSpecClient.Interface + smiAccessClient smiTrafficAccessClient.Interface + configClient configClientset.Interface + policyClient policyClientset.Interface + selectedInformers []InformerKey + customStores map[InformerKey]cache.Store +} + +type informerInit func() + +func (i *informer) GetStore() cache.Store { + if i.customStore != nil { + return i.customStore + } + + return i.informer.GetStore() +} + +func (i *informer) HasSynced() bool { + return i.informer.HasSynced() +} + +func (i *informer) Run(stop <-chan struct{}) { + i.informer.Run(stop) +} diff --git a/pkg/k8s/types.go b/pkg/k8s/types.go index 9d8c0b72c4..bb55701e41 100644 --- a/pkg/k8s/types.go +++ b/pkg/k8s/types.go @@ -8,15 +8,14 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "github.com/openservicemesh/osm/pkg/envoy" policyv1alpha1Client "github.com/openservicemesh/osm/pkg/gen/client/policy/clientset/versioned" - "github.com/openservicemesh/osm/pkg/messaging" "github.com/openservicemesh/osm/pkg/identity" + "github.com/openservicemesh/osm/pkg/k8s/informers" "github.com/openservicemesh/osm/pkg/logger" + "github.com/openservicemesh/osm/pkg/messaging" "github.com/openservicemesh/osm/pkg/service" ) @@ -65,15 +64,10 @@ const ( ServiceAccounts InformerKey = "ServiceAccounts" ) -// informerCollection is the type holding the collection of informers we keep -type informerCollection map[InformerKey]cache.SharedIndexInformer - // client is the type used to represent the k8s client for the native k8s resources type client struct { - meshName string - kubeClient kubernetes.Interface policyClient policyv1alpha1Client.Interface - informers informerCollection + informers *informers.InformerCollection msgBroker *messaging.Broker } diff --git a/pkg/smi/client_test.go b/pkg/smi/client_test.go index 2be44f3651..b8b4b6ff19 100644 --- a/pkg/smi/client_test.go +++ b/pkg/smi/client_test.go @@ -23,6 +23,7 @@ import ( "github.com/openservicemesh/osm/pkg/constants" "github.com/openservicemesh/osm/pkg/identity" "github.com/openservicemesh/osm/pkg/k8s" + "github.com/openservicemesh/osm/pkg/k8s/informers" "github.com/openservicemesh/osm/pkg/messaging" "github.com/openservicemesh/osm/pkg/service" "github.com/openservicemesh/osm/pkg/tests" @@ -47,11 +48,22 @@ func bootstrapClient(stop chan struct{}) (*client, *fakeKubeClientSet, error) { smiTrafficSpecClientSet := testTrafficSpecClient.NewSimpleClientset() smiTrafficTargetClientSet := testTrafficTargetClient.NewSimpleClientset() msgBroker := messaging.NewBroker(stop) - kubernetesClient, err := k8s.NewKubernetesController(kubeClient, nil, meshName, stop, msgBroker) + informerCollection, err := informers.NewInformerCollection(meshName, stop, kubeClient, smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet, nil, nil, + informers.WithSelectedInformers( + informers.InformerKeyNamespace, + informers.InformerKeyService, + informers.InformerKeyServiceAccount, + informers.InformerKeyPod, + informers.InformerKeyEndpoints, + ), + ) + if err != nil { return nil, nil, err } + kubernetesClient := k8s.NewKubernetesController(informerCollection, nil, msgBroker) + fakeClientSet := &fakeKubeClientSet{ kubeClient: kubeClient, smiTrafficSplitClientSet: smiTrafficSplitClientSet, From cd24fa8c9d91cfcba14697e3e0660a11fc651461 Mon Sep 17 00:00:00 2001 From: Keith Mattix II Date: Thu, 9 Jun 2022 00:01:36 -0500 Subject: [PATCH 2/8] Update changes from main to use informercollection Signed-off-by: Keith Mattix II --- pkg/k8s/client_test.go | 4 +++- pkg/k8s/informers/informers.go | 8 +++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/k8s/client_test.go b/pkg/k8s/client_test.go index ee527fef4d..f4c56450a9 100644 --- a/pkg/k8s/client_test.go +++ b/pkg/k8s/client_test.go @@ -1243,9 +1243,11 @@ func TestGetPodForProxy(t *testing.T) { tests.NewPodFixture(namespace, "pod-2", tests.BookstoreServiceAccountName, someOthePodLabels), ) - kubeController, err := NewKubernetesController(kubeClient, nil, testMeshName, stop, messaging.NewBroker(nil)) + ic, err := informers.NewInformerCollection(testMeshName, stop, kubeClient, nil, nil, nil, nil, nil, informers.WithSelectedInformers(k8sInformerKeys...)) assert.Nil(err) + kubeController := NewKubernetesController(ic, nil, messaging.NewBroker(nil)) + testCases := []struct { name string pod *corev1.Pod diff --git a/pkg/k8s/informers/informers.go b/pkg/k8s/informers/informers.go index de3e409094..a026eee3e9 100644 --- a/pkg/k8s/informers/informers.go +++ b/pkg/k8s/informers/informers.go @@ -137,7 +137,13 @@ func (ic *InformerCollection) run(stop <-chan struct{}) error { } func (ic *InformerCollection) AddEventHandler(informerKey InformerKey, handler cache.ResourceEventHandler) { - ic.informers[informerKey].informer.AddEventHandler(handler) + i, ok := ic.informers[informerKey] + if !ok { + log.Info().Msgf("attempted to add event handler for nil informer %s", informerKey) + return + } + + i.informer.AddEventHandler(handler) } func (ic *InformerCollection) GetByKey(informerKey InformerKey, objectKey string) (interface{}, bool, error) { From 45436de228a037994d035bb2c75ce818850e5b27 Mon Sep 17 00:00:00 2001 From: Keith Mattix II Date: Thu, 9 Jun 2022 11:03:25 -0500 Subject: [PATCH 3/8] Fix PR comments and startup errors Signed-off-by: Keith Mattix II --- cmd/osm-controller/osm-controller.go | 7 +++- cmd/osm-injector/osm-injector.go | 7 +++- pkg/k8s/client_test.go | 53 +++++++++++++------------- pkg/k8s/informers/informers.go | 56 ++++++++++++++++++++-------- pkg/k8s/informers/init.go | 6 +-- pkg/k8s/informers/types.go | 11 ++---- pkg/smi/client_test.go | 4 +- 7 files changed, 88 insertions(+), 56 deletions(-) diff --git a/cmd/osm-controller/osm-controller.go b/cmd/osm-controller/osm-controller.go index 59e5b57d25..66227b5bf2 100644 --- a/cmd/osm-controller/osm-controller.go +++ b/cmd/osm-controller/osm-controller.go @@ -185,7 +185,12 @@ func main() { smiTrafficSpecClientSet := smiTrafficSpecClient.NewForConfigOrDie(kubeConfig) smiTrafficTargetClientSet := smiAccessClient.NewForConfigOrDie(kubeConfig) - informerCollection, err := informers.NewInformerCollection(meshName, stop, kubeClient, smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet, configClient, policyClient) + informerCollection, err := informers.NewInformerCollection(meshName, stop, + informers.WithKubeClient(kubeClient), + informers.WithSMIClients(smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet), + informers.WithConfigClient(configClient), + informers.WithPolicyClient(policyClient), + ) if err != nil { events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating informer collection") } diff --git a/cmd/osm-injector/osm-injector.go b/cmd/osm-injector/osm-injector.go index b24d2ef592..2df4bd83a8 100644 --- a/cmd/osm-injector/osm-injector.go +++ b/cmd/osm-injector/osm-injector.go @@ -181,7 +181,12 @@ func main() { smiTrafficSpecClientSet := smiTrafficSpecClient.NewForConfigOrDie(kubeConfig) smiTrafficTargetClientSet := smiAccessClient.NewForConfigOrDie(kubeConfig) - informerCollection, err := informers.NewInformerCollection(meshName, stop, kubeClient, smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet, configClient, policyClient) + informerCollection, err := informers.NewInformerCollection(meshName, stop, + informers.WithKubeClient(kubeClient), + informers.WithSMIClients(smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet), + informers.WithConfigClient(configClient), + informers.WithPolicyClient(policyClient), + ) if err != nil { events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating informer collection") diff --git a/pkg/k8s/client_test.go b/pkg/k8s/client_test.go index f4c56450a9..ba02f0670c 100644 --- a/pkg/k8s/client_test.go +++ b/pkg/k8s/client_test.go @@ -119,10 +119,11 @@ func TestIsMonitoredNamespace(t *testing.T) { stores := map[informers.InformerKey]cache.Store{ informers.InformerKeyNamespace: store, } - ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) - store.Add(tc.namespace) + _ = store.Add(tc.namespace) actual := c.IsMonitoredNamespace(tc.ns) a.Equal(tc.expected, actual) @@ -172,10 +173,10 @@ func TestGetNamespace(t *testing.T) { stores := map[informers.InformerKey]cache.Store{ informers.InformerKeyNamespace: store, } - ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) - store.Add(tc.namespace) + _ = store.Add(tc.namespace) actual := c.GetNamespace(tc.ns) if tc.expected { @@ -229,11 +230,11 @@ func TestListMonitoredNamespaces(t *testing.T) { stores := map[informers.InformerKey]cache.Store{ informers.InformerKeyNamespace: store, } - ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) for _, ns := range tc.namespaces { - store.Add(ns) + _ = store.Add(ns) } actual, err := c.ListMonitoredNamespaces() @@ -298,10 +299,10 @@ func TestGetService(t *testing.T) { stores := map[informers.InformerKey]cache.Store{ informers.InformerKeyService: store, } - ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) - store.Add(tc.service) + _ = store.Add(tc.service) actual := c.GetService(tc.svc) if tc.expected { @@ -373,13 +374,13 @@ func TestListServices(t *testing.T) { informers.InformerKeyNamespace: nsStore, informers.InformerKeyService: svcStore, } - ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) - nsStore.Add(tc.namespace) + _ = nsStore.Add(tc.namespace) for _, s := range tc.services { - svcStore.Add(s) + _ = svcStore.Add(s) } actual := c.ListServices() @@ -448,13 +449,13 @@ func TestListServiceAccounts(t *testing.T) { informers.InformerKeyNamespace: nsStore, informers.InformerKeyServiceAccount: svcAccountStore, } - ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) - nsStore.Add(tc.namespace) + _ = nsStore.Add(tc.namespace) for _, s := range tc.sa { - svcAccountStore.Add(s) + _ = svcAccountStore.Add(s) } actual := c.ListServiceAccounts() @@ -523,13 +524,13 @@ func TestListPods(t *testing.T) { informers.InformerKeyNamespace: nsStore, informers.InformerKeyPod: pStore, } - ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) - nsStore.Add(tc.namespace) + _ = nsStore.Add(tc.namespace) for _, p := range tc.pods { - pStore.Add(p) + _ = pStore.Add(p) } actual := c.ListPods() @@ -587,10 +588,10 @@ func TestGetEndpoints(t *testing.T) { stores := map[informers.InformerKey]cache.Store{ informers.InformerKeyEndpoints: eptsStore, } - ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) - eptsStore.Add(tc.endpoints) + _ = eptsStore.Add(tc.endpoints) actual, err := c.GetEndpoints(tc.svc) a.Nil(err) @@ -712,14 +713,14 @@ func TestListServiceIdentitiesForService(t *testing.T) { informers.InformerKeyPod: pStore, informers.InformerKeyService: svcStore, } - ic, err := informers.NewInformerCollection(testMeshName, nil, testclient.NewSimpleClientset(), nil, nil, nil, nil, nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) - nsStore.Add(tc.namespace) + _ = nsStore.Add(tc.namespace) for _, p := range tc.pods { - pStore.Add(p) + _ = pStore.Add(p) } - svcStore.Add(tc.service) + _ = svcStore.Add(tc.service) actual, err := c.ListServiceIdentitiesForService(tc.svc) a.Equal(tc.expectErr, err != nil) @@ -886,7 +887,7 @@ func TestUpdateStatus(t *testing.T) { a := tassert.New(t) kubeClient := testclient.NewSimpleClientset() policyClient := fakePolicyClient.NewSimpleClientset(tc.existingResource.(runtime.Object)) - ic, err := informers.NewInformerCollection(testMeshName, nil, kubeClient, nil, nil, nil, nil, policyClient, informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(kubeClient), informers.WithPolicyClient(policyClient), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := NewKubernetesController(ic, policyClient, nil) _, err = c.UpdateStatus(tc.updatedResource) @@ -1205,7 +1206,7 @@ func TestK8sServicesToMeshServices(t *testing.T) { fakeClient := testclient.NewSimpleClientset(tc.svcEndpoints...) stop := make(chan struct{}) - ic, err := informers.NewInformerCollection(testMeshName, stop, fakeClient, nil, nil, nil, nil, nil, informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, stop, informers.WithKubeClient(fakeClient), informers.WithSelectedInformers(k8sInformerKeys...)) assert.Nil(err) kubeController := NewKubernetesController(ic, nil, nil) @@ -1243,7 +1244,7 @@ func TestGetPodForProxy(t *testing.T) { tests.NewPodFixture(namespace, "pod-2", tests.BookstoreServiceAccountName, someOthePodLabels), ) - ic, err := informers.NewInformerCollection(testMeshName, stop, kubeClient, nil, nil, nil, nil, nil, informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, stop, informers.WithKubeClient(kubeClient), informers.WithSelectedInformers(k8sInformerKeys...)) assert.Nil(err) kubeController := NewKubernetesController(ic, nil, messaging.NewBroker(nil)) diff --git a/pkg/k8s/informers/informers.go b/pkg/k8s/informers/informers.go index a026eee3e9..fcdd50a578 100644 --- a/pkg/k8s/informers/informers.go +++ b/pkg/k8s/informers/informers.go @@ -12,21 +12,14 @@ import ( policyClientset "github.com/openservicemesh/osm/pkg/gen/client/policy/clientset/versioned" ) -type InformerCollectionOpt func(*InformerCollection) +type InformerCollectionOption func(*InformerCollection) -func NewInformerCollection(meshName string, stop <-chan struct{}, kubeClient kubernetes.Interface, smiTrafficSplitClient smiTrafficSplitClient.Interface, - smiTrafficSpecClient smiTrafficSpecClient.Interface, smiAccessClient smiTrafficAccessClient.Interface, configClient configClientset.Interface, - policyClient policyClientset.Interface, opts ...InformerCollectionOpt) (*InformerCollection, error) { +// NewInformerCollection creates a new InformerCollection +func NewInformerCollection(meshName string, stop <-chan struct{}, opts ...InformerCollectionOption) (*InformerCollection, error) { ic := &InformerCollection{ - meshName: meshName, - kubeClient: kubeClient, - smiTrafficSplitClient: smiTrafficSplitClient, - smiTrafficSpecClient: smiTrafficSpecClient, - smiAccessClient: smiAccessClient, - configClient: configClient, - policyClient: policyClient, - informers: make(informerCollection), + meshName: meshName, + informers: map[InformerKey]*informer{}, } for _, opt := range opts { @@ -92,23 +85,54 @@ func NewInformerCollection(meshName string, stop <-chan struct{}, kubeClient kub return ic, nil } -// WithCustomStore sets the shared store for the informerCollection to reference. +// WithCustomStore sets the shared store for the InformerCollection to reference. // This store will be used instead of each informer's store. This should typically // only be used in tests -func WithCustomStores(stores map[InformerKey]cache.Store) InformerCollectionOpt { +func WithCustomStores(stores map[InformerKey]cache.Store) InformerCollectionOption { return func(ic *InformerCollection) { ic.customStores = stores } } -func WithSelectedInformers(selectedInformers ...InformerKey) InformerCollectionOpt { +// WithSelectedInformers sets the selected informers for the InformerCollection +func WithSelectedInformers(selectedInformers ...InformerKey) InformerCollectionOption { return func(ic *InformerCollection) { ic.selectedInformers = selectedInformers } } +// WithKubeClient sets the kubeClient for the InformerCollection +func WithKubeClient(kubeClient kubernetes.Interface) InformerCollectionOption { + return func(ic *InformerCollection) { + ic.kubeClient = kubeClient + } +} + +// WithSMIClient sets the SMI clients for the InformerCollection +func WithSMIClients(smiTrafficSplitClient smiTrafficSplitClient.Interface, smiTrafficSpecClient smiTrafficSpecClient.Interface, smiAccessClient smiTrafficAccessClient.Interface) InformerCollectionOption { + return func(ic *InformerCollection) { + ic.smiTrafficSplitClient = smiTrafficSplitClient + ic.smiTrafficSpecClient = smiTrafficSpecClient + ic.smiAccessClient = smiAccessClient + } +} + +// WithConfigClient sets the config client for the InformerCollection +func WithConfigClient(configClient configClientset.Interface) InformerCollectionOption { + return func(ic *InformerCollection) { + ic.configClient = configClient + } +} + +// WithPolicyClient sets the policy client for the InformerCollection +func WithPolicyClient(policyClient policyClientset.Interface) InformerCollectionOption { + return func(ic *InformerCollection) { + ic.policyClient = policyClient + } +} + func (ic *InformerCollection) run(stop <-chan struct{}) error { - log.Info().Msg("Namespace controller client started") + log.Info().Msg("InformerCollection started") var hasSynced []cache.InformerSynced var names []string diff --git a/pkg/k8s/informers/init.go b/pkg/k8s/informers/init.go index a96710d041..84aab94823 100644 --- a/pkg/k8s/informers/init.go +++ b/pkg/k8s/informers/init.go @@ -101,7 +101,7 @@ func (ic *InformerCollection) initEndpointMonitor() { func (ic *InformerCollection) initTrafficSplitMonitor() { informerFactory := smiTrafficSplitInformers.NewSharedInformerFactory(ic.smiTrafficSplitClient, DefaultKubeEventResyncInterval) informer := &informer{ - informer: informerFactory.Split().V1alpha3().TrafficSplits().Informer(), + informer: informerFactory.Split().V1alpha2().TrafficSplits().Informer(), } customStore := ic.customStores[InformerKeyTrafficSplit] @@ -127,7 +127,7 @@ func (ic *InformerCollection) initTrafficTargetMonitor() { func (ic *InformerCollection) initHTTPRouteGroupMonitor() { informerFactory := smiTrafficSpecInformers.NewSharedInformerFactory(ic.smiTrafficSpecClient, DefaultKubeEventResyncInterval) informer := &informer{ - informer: informerFactory.Specs().V1alpha2().HTTPRouteGroups().Informer(), + informer: informerFactory.Specs().V1alpha4().HTTPRouteGroups().Informer(), } customStore := ic.customStores[InformerKeyHTTPRouteGroup] @@ -140,7 +140,7 @@ func (ic *InformerCollection) initHTTPRouteGroupMonitor() { func (ic *InformerCollection) initTCPRouteMonitor() { informerFactory := smiTrafficSpecInformers.NewSharedInformerFactory(ic.smiTrafficSpecClient, DefaultKubeEventResyncInterval) informer := &informer{ - informer: informerFactory.Specs().V1alpha1().TCPRoutes().Informer(), + informer: informerFactory.Specs().V1alpha4().TCPRoutes().Informer(), } customStore := ic.customStores[InformerKeyTCPRoute] diff --git a/pkg/k8s/informers/types.go b/pkg/k8s/informers/types.go index 689e304efd..4a33de0a67 100644 --- a/pkg/k8s/informers/types.go +++ b/pkg/k8s/informers/types.go @@ -51,9 +51,8 @@ const ( ) var ( - errInitInformers = errors.New("informer not initialized") - errSyncingCaches = errors.New("failed initial cache sync for informers") - errReadFromNilInformer = errors.New("failed to read from a nil informer") + errInitInformers = errors.New("informer not initialized") + errSyncingCaches = errors.New("failed initial cache sync for informers") ) type informer struct { @@ -61,12 +60,8 @@ type informer struct { informer cache.SharedIndexInformer } -type informerCollection map[InformerKey]*informer - -type getStoreFunc func() cache.Store - type InformerCollection struct { - informers informerCollection + informers map[InformerKey]*informer meshName string kubeClient kubernetes.Interface smiTrafficSplitClient smiTrafficSplitClient.Interface diff --git a/pkg/smi/client_test.go b/pkg/smi/client_test.go index b8b4b6ff19..f6a2ca13cd 100644 --- a/pkg/smi/client_test.go +++ b/pkg/smi/client_test.go @@ -48,7 +48,9 @@ func bootstrapClient(stop chan struct{}) (*client, *fakeKubeClientSet, error) { smiTrafficSpecClientSet := testTrafficSpecClient.NewSimpleClientset() smiTrafficTargetClientSet := testTrafficTargetClient.NewSimpleClientset() msgBroker := messaging.NewBroker(stop) - informerCollection, err := informers.NewInformerCollection(meshName, stop, kubeClient, smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet, nil, nil, + informerCollection, err := informers.NewInformerCollection(meshName, stop, + informers.WithKubeClient(kubeClient), + informers.WithSMIClients(smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet), informers.WithSelectedInformers( informers.InformerKeyNamespace, informers.InformerKeyService, From d71fa54fa97d1ae7866cba9fac1055665e10ad26 Mon Sep 17 00:00:00 2001 From: Keith Mattix II Date: Thu, 9 Jun 2022 11:27:15 -0500 Subject: [PATCH 4/8] Fix nil opt and linting errors Signed-off-by: Keith Mattix II --- pkg/k8s/client_test.go | 14 ++++++------ pkg/k8s/informers/informers.go | 13 +++++++---- pkg/k8s/informers/init.go | 5 +++-- pkg/k8s/informers/types.go | 41 +++++++++++++++++++++------------- 4 files changed, 45 insertions(+), 28 deletions(-) diff --git a/pkg/k8s/client_test.go b/pkg/k8s/client_test.go index ba02f0670c..740b2d2f48 100644 --- a/pkg/k8s/client_test.go +++ b/pkg/k8s/client_test.go @@ -230,7 +230,7 @@ func TestListMonitoredNamespaces(t *testing.T) { stores := map[informers.InformerKey]cache.Store{ informers.InformerKeyNamespace: store, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) for _, ns := range tc.namespaces { @@ -299,7 +299,7 @@ func TestGetService(t *testing.T) { stores := map[informers.InformerKey]cache.Store{ informers.InformerKeyService: store, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) _ = store.Add(tc.service) @@ -374,7 +374,7 @@ func TestListServices(t *testing.T) { informers.InformerKeyNamespace: nsStore, informers.InformerKeyService: svcStore, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) _ = nsStore.Add(tc.namespace) @@ -449,7 +449,7 @@ func TestListServiceAccounts(t *testing.T) { informers.InformerKeyNamespace: nsStore, informers.InformerKeyServiceAccount: svcAccountStore, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) _ = nsStore.Add(tc.namespace) @@ -524,7 +524,7 @@ func TestListPods(t *testing.T) { informers.InformerKeyNamespace: nsStore, informers.InformerKeyPod: pStore, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) _ = nsStore.Add(tc.namespace) @@ -588,7 +588,7 @@ func TestGetEndpoints(t *testing.T) { stores := map[informers.InformerKey]cache.Store{ informers.InformerKeyEndpoints: eptsStore, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) _ = eptsStore.Add(tc.endpoints) @@ -713,7 +713,7 @@ func TestListServiceIdentitiesForService(t *testing.T) { informers.InformerKeyPod: pStore, informers.InformerKeyService: svcStore, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), nil, informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) a.Nil(err) c := newClient(ic, nil, nil) _ = nsStore.Add(tc.namespace) diff --git a/pkg/k8s/informers/informers.go b/pkg/k8s/informers/informers.go index fcdd50a578..9afc82ffac 100644 --- a/pkg/k8s/informers/informers.go +++ b/pkg/k8s/informers/informers.go @@ -12,18 +12,20 @@ import ( policyClientset "github.com/openservicemesh/osm/pkg/gen/client/policy/clientset/versioned" ) +// InformerCollectionOption is a function that modifies an informer collection type InformerCollectionOption func(*InformerCollection) // NewInformerCollection creates a new InformerCollection func NewInformerCollection(meshName string, stop <-chan struct{}, opts ...InformerCollectionOption) (*InformerCollection, error) { - ic := &InformerCollection{ meshName: meshName, informers: map[InformerKey]*informer{}, } for _, opt := range opts { - opt(ic) + if opt != nil { + opt(ic) + } } // Initialize informers @@ -85,7 +87,7 @@ func NewInformerCollection(meshName string, stop <-chan struct{}, opts ...Inform return ic, nil } -// WithCustomStore sets the shared store for the InformerCollection to reference. +// WithCustomStores sets the shared store for the InformerCollection to reference. // This store will be used instead of each informer's store. This should typically // only be used in tests func WithCustomStores(stores map[InformerKey]cache.Store) InformerCollectionOption { @@ -108,7 +110,7 @@ func WithKubeClient(kubeClient kubernetes.Interface) InformerCollectionOption { } } -// WithSMIClient sets the SMI clients for the InformerCollection +// WithSMIClients sets the SMI clients for the InformerCollection func WithSMIClients(smiTrafficSplitClient smiTrafficSplitClient.Interface, smiTrafficSpecClient smiTrafficSpecClient.Interface, smiAccessClient smiTrafficAccessClient.Interface) InformerCollectionOption { return func(ic *InformerCollection) { ic.smiTrafficSplitClient = smiTrafficSplitClient @@ -160,6 +162,7 @@ func (ic *InformerCollection) run(stop <-chan struct{}) error { return nil } +// AddEventHandler adds an handler to the informer indexed by the given InformerKey func (ic *InformerCollection) AddEventHandler(informerKey InformerKey, handler cache.ResourceEventHandler) { i, ok := ic.informers[informerKey] if !ok { @@ -170,6 +173,7 @@ func (ic *InformerCollection) AddEventHandler(informerKey InformerKey, handler c i.informer.AddEventHandler(handler) } +// GetByKey retrieves an item (based on the given index) from the store of the informer indexed by the given InformerKey func (ic *InformerCollection) GetByKey(informerKey InformerKey, objectKey string) (interface{}, bool, error) { informer, ok := ic.informers[informerKey] if !ok { @@ -180,6 +184,7 @@ func (ic *InformerCollection) GetByKey(informerKey InformerKey, objectKey string return informer.GetStore().GetByKey(objectKey) } +// List returns the contents of the store of the informer indexed by the given InformerKey func (ic *InformerCollection) List(informerKey InformerKey) []interface{} { informer, ok := ic.informers[informerKey] if !ok { diff --git a/pkg/k8s/informers/init.go b/pkg/k8s/informers/init.go index 84aab94823..6c4c177a2d 100644 --- a/pkg/k8s/informers/init.go +++ b/pkg/k8s/informers/init.go @@ -5,12 +5,13 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/informers" - configInformers "github.com/openservicemesh/osm/pkg/gen/client/config/informers/externalversions" - policyInformers "github.com/openservicemesh/osm/pkg/gen/client/policy/informers/externalversions" smiAccessInformers "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/informers/externalversions" smiTrafficSpecInformers "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/informers/externalversions" smiTrafficSplitInformers "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/informers/externalversions" + configInformers "github.com/openservicemesh/osm/pkg/gen/client/config/informers/externalversions" + policyInformers "github.com/openservicemesh/osm/pkg/gen/client/policy/informers/externalversions" + "github.com/openservicemesh/osm/pkg/constants" ) diff --git a/pkg/k8s/informers/types.go b/pkg/k8s/informers/types.go index 4a33de0a67..6ddb927680 100644 --- a/pkg/k8s/informers/types.go +++ b/pkg/k8s/informers/types.go @@ -19,28 +19,39 @@ import ( type InformerKey string const ( - // Kubernetes - InformerKeyNamespace InformerKey = "Namespace" - InformerKeyService InformerKey = "Service" - InformerKeyPod InformerKey = "Pod" - InformerKeyEndpoints InformerKey = "Endpoints" + // InformerKeyNamespace is the InformerKey for a Namespace informer + InformerKeyNamespace InformerKey = "Namespace" + // InformerKeyService is the InformerKey for a Service informer + InformerKeyService InformerKey = "Service" + // InformerKeyPod is the InformerKey for a Pod informer + InformerKeyPod InformerKey = "Pod" + // InformerKeyEndpoints is the InformerKey for a Endpoints informer + InformerKeyEndpoints InformerKey = "Endpoints" + // InformerKeyServiceAccount is the InformerKey for a ServiceAccount informer InformerKeyServiceAccount InformerKey = "ServiceAccount" - // SMI - InformerKeyTrafficSplit InformerKey = "TrafficSplit" - InformerKeyTrafficTarget InformerKey = "TrafficTarget" + // InformerKeyTrafficSplit is the InformerKey for a TrafficSplit informer + InformerKeyTrafficSplit InformerKey = "TrafficSplit" + // InformerKeyTrafficTarget is the InformerKey for a TrafficTarget informer + InformerKeyTrafficTarget InformerKey = "TrafficTarget" + // InformerKeyHTTPRouteGroup is the InformerKey for a HTTPRouteGroup informer InformerKeyHTTPRouteGroup InformerKey = "HTTPRouteGroup" - InformerKeyTCPRoute InformerKey = "TCPRoute" + // InformerKeyTCPRoute is the InformerKey for a TCPRoute informer + InformerKeyTCPRoute InformerKey = "TCPRoute" - // Config - InformerKeyMeshConfig InformerKey = "MeshConfig" + // InformerKeyMeshConfig is the InformerKey for a MeshConfig informer + InformerKeyMeshConfig InformerKey = "MeshConfig" + // InformerKeyMeshRootCertificate is the InformerKey for a MeshRootCertificate informer InformerKeyMeshRootCertificate InformerKey = "MeshRootCertificate" - // Policy - InformerKeyEgress InformerKey = "Egress" - InformerKeyIngressBackend InformerKey = "IngressBackend" + // InformerKeyEgress is the InformerKey for a Egress informer + InformerKeyEgress InformerKey = "Egress" + // InformerKeyIngressBackend is the InformerKey for a IngressBackend informer + InformerKeyIngressBackend InformerKey = "IngressBackend" + // InformerKeyUpstreamTrafficSetting is the InformerKey for a UpstreamTrafficSetting informer InformerKeyUpstreamTrafficSetting InformerKey = "UpstreamTrafficSetting" - InformerKeyRetry InformerKey = "Retry" + // InformerKeyRetry is the InformerKey for a Retry informer + InformerKeyRetry InformerKey = "Retry" ) const ( From ceb522c86f0778a17a49c74854d395d5bdabc721 Mon Sep 17 00:00:00 2001 From: Keith Mattix II Date: Thu, 9 Jun 2022 11:35:24 -0500 Subject: [PATCH 5/8] Add comment describing WithCustomStores functionality Signed-off-by: Keith Mattix II --- pkg/k8s/informers/informers.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/k8s/informers/informers.go b/pkg/k8s/informers/informers.go index 9afc82ffac..399e52d37a 100644 --- a/pkg/k8s/informers/informers.go +++ b/pkg/k8s/informers/informers.go @@ -87,9 +87,11 @@ func NewInformerCollection(meshName string, stop <-chan struct{}, opts ...Inform return ic, nil } -// WithCustomStores sets the shared store for the InformerCollection to reference. -// This store will be used instead of each informer's store. This should typically -// only be used in tests +// WithCustomStores provides the InformerCollection an set of `cache.Store`s indexed +// by InformerKey. This functionality was added for the express purpose of testing +// flexibility since the alternative often leads to flaky tests and race conditions +// between the time an object is added to a fake clientset and when that object +// is actually added to the informer `cache.Store`. func WithCustomStores(stores map[InformerKey]cache.Store) InformerCollectionOption { return func(ic *InformerCollection) { ic.customStores = stores From be516de5467d6aae7819a63512b4117f6b643c26 Mon Sep 17 00:00:00 2001 From: Keith Mattix II Date: Thu, 9 Jun 2022 11:57:14 -0500 Subject: [PATCH 6/8] Infer selected informers based on client options Signed-off-by: Keith Mattix II --- pkg/k8s/client_test.go | 24 ++++----- pkg/k8s/informers/informers.go | 90 +++++++++++++++++++++++++--------- pkg/k8s/informers/types.go | 2 +- pkg/smi/client_test.go | 7 --- 4 files changed, 79 insertions(+), 44 deletions(-) diff --git a/pkg/k8s/client_test.go b/pkg/k8s/client_test.go index 740b2d2f48..4f31fa3240 100644 --- a/pkg/k8s/client_test.go +++ b/pkg/k8s/client_test.go @@ -120,7 +120,7 @@ func TestIsMonitoredNamespace(t *testing.T) { informers.InformerKeyNamespace: store, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) c := newClient(ic, nil, nil) _ = store.Add(tc.namespace) @@ -173,7 +173,7 @@ func TestGetNamespace(t *testing.T) { stores := map[informers.InformerKey]cache.Store{ informers.InformerKeyNamespace: store, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) c := newClient(ic, nil, nil) _ = store.Add(tc.namespace) @@ -230,7 +230,7 @@ func TestListMonitoredNamespaces(t *testing.T) { stores := map[informers.InformerKey]cache.Store{ informers.InformerKeyNamespace: store, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) c := newClient(ic, nil, nil) for _, ns := range tc.namespaces { @@ -299,7 +299,7 @@ func TestGetService(t *testing.T) { stores := map[informers.InformerKey]cache.Store{ informers.InformerKeyService: store, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) c := newClient(ic, nil, nil) _ = store.Add(tc.service) @@ -374,7 +374,7 @@ func TestListServices(t *testing.T) { informers.InformerKeyNamespace: nsStore, informers.InformerKeyService: svcStore, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) c := newClient(ic, nil, nil) _ = nsStore.Add(tc.namespace) @@ -449,7 +449,7 @@ func TestListServiceAccounts(t *testing.T) { informers.InformerKeyNamespace: nsStore, informers.InformerKeyServiceAccount: svcAccountStore, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) c := newClient(ic, nil, nil) _ = nsStore.Add(tc.namespace) @@ -524,7 +524,7 @@ func TestListPods(t *testing.T) { informers.InformerKeyNamespace: nsStore, informers.InformerKeyPod: pStore, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) c := newClient(ic, nil, nil) _ = nsStore.Add(tc.namespace) @@ -588,7 +588,7 @@ func TestGetEndpoints(t *testing.T) { stores := map[informers.InformerKey]cache.Store{ informers.InformerKeyEndpoints: eptsStore, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) c := newClient(ic, nil, nil) _ = eptsStore.Add(tc.endpoints) @@ -713,7 +713,7 @@ func TestListServiceIdentitiesForService(t *testing.T) { informers.InformerKeyPod: pStore, informers.InformerKeyService: svcStore, } - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) c := newClient(ic, nil, nil) _ = nsStore.Add(tc.namespace) @@ -887,7 +887,7 @@ func TestUpdateStatus(t *testing.T) { a := tassert.New(t) kubeClient := testclient.NewSimpleClientset() policyClient := fakePolicyClient.NewSimpleClientset(tc.existingResource.(runtime.Object)) - ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(kubeClient), informers.WithPolicyClient(policyClient), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(kubeClient), informers.WithPolicyClient(policyClient)) a.Nil(err) c := NewKubernetesController(ic, policyClient, nil) _, err = c.UpdateStatus(tc.updatedResource) @@ -1206,7 +1206,7 @@ func TestK8sServicesToMeshServices(t *testing.T) { fakeClient := testclient.NewSimpleClientset(tc.svcEndpoints...) stop := make(chan struct{}) - ic, err := informers.NewInformerCollection(testMeshName, stop, informers.WithKubeClient(fakeClient), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, stop, informers.WithKubeClient(fakeClient)) assert.Nil(err) kubeController := NewKubernetesController(ic, nil, nil) @@ -1244,7 +1244,7 @@ func TestGetPodForProxy(t *testing.T) { tests.NewPodFixture(namespace, "pod-2", tests.BookstoreServiceAccountName, someOthePodLabels), ) - ic, err := informers.NewInformerCollection(testMeshName, stop, informers.WithKubeClient(kubeClient), informers.WithSelectedInformers(k8sInformerKeys...)) + ic, err := informers.NewInformerCollection(testMeshName, stop, informers.WithKubeClient(kubeClient)) assert.Nil(err) kubeController := NewKubernetesController(ic, nil, messaging.NewBroker(nil)) diff --git a/pkg/k8s/informers/informers.go b/pkg/k8s/informers/informers.go index 399e52d37a..04b23e1b06 100644 --- a/pkg/k8s/informers/informers.go +++ b/pkg/k8s/informers/informers.go @@ -12,6 +12,35 @@ import ( policyClientset "github.com/openservicemesh/osm/pkg/gen/client/policy/clientset/versioned" ) +var ( + k8sInformerKeys = []InformerKey{ + InformerKeyNamespace, + InformerKeyService, + InformerKeyServiceAccount, + InformerKeyPod, + InformerKeyEndpoints, + } + + smiInformerKeys = []InformerKey{ + InformerKeyTrafficSplit, + InformerKeyTrafficTarget, + InformerKeyHTTPRouteGroup, + InformerKeyTCPRoute, + } + + configInformerKeys = []InformerKey{ + InformerKeyMeshConfig, + InformerKeyMeshRootCertificate, + } + + policyInformerKeys = []InformerKey{ + InformerKeyEgress, + InformerKeyIngressBackend, + InformerKeyUpstreamTrafficSetting, + InformerKeyRetry, + } +) + // InformerCollectionOption is a function that modifies an informer collection type InformerCollectionOption func(*InformerCollection) @@ -56,26 +85,26 @@ func NewInformerCollection(meshName string, stop <-chan struct{}, opts ...Inform if len(ic.selectedInformers) == 0 { // Initialize all informers - ic.selectedInformers = []InformerKey{ - InformerKeyNamespace, - InformerKeyService, - InformerKeyPod, - InformerKeyEndpoints, - InformerKeyServiceAccount, - InformerKeyTrafficSplit, - InformerKeyTrafficTarget, - InformerKeyHTTPRouteGroup, - InformerKeyTCPRoute, - InformerKeyMeshConfig, - InformerKeyMeshRootCertificate, - InformerKeyEgress, - InformerKeyIngressBackend, - InformerKeyUpstreamTrafficSetting, - InformerKeyRetry, + ic.selectedInformers = map[InformerKey]struct{}{ + InformerKeyNamespace: {}, + InformerKeyService: {}, + InformerKeyPod: {}, + InformerKeyEndpoints: {}, + InformerKeyServiceAccount: {}, + InformerKeyTrafficSplit: {}, + InformerKeyTrafficTarget: {}, + InformerKeyHTTPRouteGroup: {}, + InformerKeyTCPRoute: {}, + InformerKeyMeshConfig: {}, + InformerKeyMeshRootCertificate: {}, + InformerKeyEgress: {}, + InformerKeyIngressBackend: {}, + InformerKeyUpstreamTrafficSetting: {}, + InformerKeyRetry: {}, } } - for _, key := range ic.selectedInformers { + for key := range ic.selectedInformers { informerInitHandlerMap[key]() } @@ -98,17 +127,15 @@ func WithCustomStores(stores map[InformerKey]cache.Store) InformerCollectionOpti } } -// WithSelectedInformers sets the selected informers for the InformerCollection -func WithSelectedInformers(selectedInformers ...InformerKey) InformerCollectionOption { - return func(ic *InformerCollection) { - ic.selectedInformers = selectedInformers - } -} - // WithKubeClient sets the kubeClient for the InformerCollection func WithKubeClient(kubeClient kubernetes.Interface) InformerCollectionOption { return func(ic *InformerCollection) { ic.kubeClient = kubeClient + + // select the k8s informers + for _, key := range k8sInformerKeys { + ic.selectedInformers[key] = struct{}{} + } } } @@ -118,6 +145,11 @@ func WithSMIClients(smiTrafficSplitClient smiTrafficSplitClient.Interface, smiTr ic.smiTrafficSplitClient = smiTrafficSplitClient ic.smiTrafficSpecClient = smiTrafficSpecClient ic.smiAccessClient = smiAccessClient + + // select the SMI informers + for _, key := range smiInformerKeys { + ic.selectedInformers[key] = struct{}{} + } } } @@ -125,6 +157,11 @@ func WithSMIClients(smiTrafficSplitClient smiTrafficSplitClient.Interface, smiTr func WithConfigClient(configClient configClientset.Interface) InformerCollectionOption { return func(ic *InformerCollection) { ic.configClient = configClient + + // select the config informers + for _, key := range configInformerKeys { + ic.selectedInformers[key] = struct{}{} + } } } @@ -132,6 +169,11 @@ func WithConfigClient(configClient configClientset.Interface) InformerCollection func WithPolicyClient(policyClient policyClientset.Interface) InformerCollectionOption { return func(ic *InformerCollection) { ic.policyClient = policyClient + + // select the policy informers + for _, key := range policyInformerKeys { + ic.selectedInformers[key] = struct{}{} + } } } diff --git a/pkg/k8s/informers/types.go b/pkg/k8s/informers/types.go index 6ddb927680..3eb30919cc 100644 --- a/pkg/k8s/informers/types.go +++ b/pkg/k8s/informers/types.go @@ -80,7 +80,7 @@ type InformerCollection struct { smiAccessClient smiTrafficAccessClient.Interface configClient configClientset.Interface policyClient policyClientset.Interface - selectedInformers []InformerKey + selectedInformers map[InformerKey]struct{} customStores map[InformerKey]cache.Store } diff --git a/pkg/smi/client_test.go b/pkg/smi/client_test.go index f6a2ca13cd..a03d1aa81f 100644 --- a/pkg/smi/client_test.go +++ b/pkg/smi/client_test.go @@ -51,13 +51,6 @@ func bootstrapClient(stop chan struct{}) (*client, *fakeKubeClientSet, error) { informerCollection, err := informers.NewInformerCollection(meshName, stop, informers.WithKubeClient(kubeClient), informers.WithSMIClients(smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet), - informers.WithSelectedInformers( - informers.InformerKeyNamespace, - informers.InformerKeyService, - informers.InformerKeyServiceAccount, - informers.InformerKeyPod, - informers.InformerKeyEndpoints, - ), ) if err != nil { From a2b6ed32b8312c9052fb71565bd76027713c5f55 Mon Sep 17 00:00:00 2001 From: Keith Mattix II Date: Thu, 9 Jun 2022 12:34:39 -0500 Subject: [PATCH 7/8] Resolve PR comments Signed-off-by: Keith Mattix II --- pkg/k8s/informers/informers.go | 11 ++++++----- pkg/k8s/informers/init.go | 1 - 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/k8s/informers/informers.go b/pkg/k8s/informers/informers.go index 04b23e1b06..4b44f889e5 100644 --- a/pkg/k8s/informers/informers.go +++ b/pkg/k8s/informers/informers.go @@ -47,17 +47,18 @@ type InformerCollectionOption func(*InformerCollection) // NewInformerCollection creates a new InformerCollection func NewInformerCollection(meshName string, stop <-chan struct{}, opts ...InformerCollectionOption) (*InformerCollection, error) { ic := &InformerCollection{ - meshName: meshName, - informers: map[InformerKey]*informer{}, + meshName: meshName, + informers: map[InformerKey]*informer{}, + selectedInformers: map[InformerKey]struct{}{}, } + // Execute all of the given options (e.g. set clients, set custom stores, etc.) for _, opt := range opts { if opt != nil { opt(ic) } } - // Initialize informers informerInitHandlerMap := map[InformerKey]informerInit{ // Kubernetes InformerKeyNamespace: ic.initNamespaceMonitor, @@ -84,7 +85,7 @@ func NewInformerCollection(meshName string, stop <-chan struct{}, opts ...Inform } if len(ic.selectedInformers) == 0 { - // Initialize all informers + // Select all informers ic.selectedInformers = map[InformerKey]struct{}{ InformerKeyNamespace: {}, InformerKeyService: {}, @@ -191,7 +192,7 @@ func (ic *InformerCollection) run(stop <-chan struct{}) error { continue } - go informer.Run(make(chan struct{})) + go informer.Run(stop) names = append(names, string(name)) log.Info().Msgf("Waiting for %s informer cache sync...", name) hasSynced = append(hasSynced, informer.HasSynced) diff --git a/pkg/k8s/informers/init.go b/pkg/k8s/informers/init.go index 6c4c177a2d..af5c7ca3f9 100644 --- a/pkg/k8s/informers/init.go +++ b/pkg/k8s/informers/init.go @@ -41,7 +41,6 @@ func (ic *InformerCollection) initNamespaceMonitor() { if customStore != nil { informer.customStore = customStore } - ic.informers[InformerKeyNamespace] = informer } From 76531599574c49e548c39636e10b4e82c3ed01aa Mon Sep 17 00:00:00 2001 From: Keith Mattix II Date: Thu, 9 Jun 2022 12:51:50 -0500 Subject: [PATCH 8/8] Fix linting errors Signed-off-by: Keith Mattix II --- pkg/k8s/client_test.go | 9 +-------- pkg/k8s/informers/types.go | 3 +++ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/pkg/k8s/client_test.go b/pkg/k8s/client_test.go index 4f31fa3240..2f99247797 100644 --- a/pkg/k8s/client_test.go +++ b/pkg/k8s/client_test.go @@ -29,14 +29,7 @@ import ( ) var ( - testMeshName = "mesh" - k8sInformerKeys = []informers.InformerKey{ - informers.InformerKeyNamespace, - informers.InformerKeyService, - informers.InformerKeyServiceAccount, - informers.InformerKeyPod, - informers.InformerKeyEndpoints, - } + testMeshName = "mesh" ) func newFakeCacheStore(store map[string]interface{}, keyFunc func(obj interface{}) string) cache.Store { diff --git a/pkg/k8s/informers/types.go b/pkg/k8s/informers/types.go index 3eb30919cc..e93c6e5008 100644 --- a/pkg/k8s/informers/types.go +++ b/pkg/k8s/informers/types.go @@ -71,6 +71,9 @@ type informer struct { informer cache.SharedIndexInformer } +// InformerCollection is an abstraction around a set of informers +// initialized with the clients stored in its fields. This data +// type should only be passed around as a pointer type InformerCollection struct { informers map[InformerKey]*informer meshName string