diff --git a/cmd/osm-controller/osm-controller.go b/cmd/osm-controller/osm-controller.go index ad1a3d2b4b..66227b5bf2 100644 --- a/cmd/osm-controller/osm-controller.go +++ b/cmd/osm-controller/osm-controller.go @@ -14,6 +14,9 @@ import ( "time" "github.com/pkg/errors" + smiAccessClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/clientset/versioned" + smiTrafficSpecClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/clientset/versioned" + smiTrafficSplitClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned" "github.com/spf13/pflag" admissionv1 "k8s.io/api/admissionregistration/v1" corev1 "k8s.io/api/core/v1" @@ -43,6 +46,7 @@ import ( "github.com/openservicemesh/osm/pkg/ingress" "github.com/openservicemesh/osm/pkg/k8s" "github.com/openservicemesh/osm/pkg/k8s/events" + "github.com/openservicemesh/osm/pkg/k8s/informers" "github.com/openservicemesh/osm/pkg/logger" "github.com/openservicemesh/osm/pkg/messaging" "github.com/openservicemesh/osm/pkg/metricsstore" @@ -151,6 +155,7 @@ func main() { } kubeClient := kubernetes.NewForConfigOrDie(kubeConfig) policyClient := policyClientset.NewForConfigOrDie(kubeConfig) + configClient := configClientset.NewForConfigOrDie(kubeConfig) // Initialize the generic Kubernetes event recorder and associate it with the osm-controller pod resource controllerPod, err := getOSMControllerPod(kubeClient) @@ -176,17 +181,28 @@ func main() { msgBroker := messaging.NewBroker(stop) - // This component will be watching resources in the config.openservicemesh.io API group - cfg, err := configurator.NewConfigurator(configClientset.NewForConfigOrDie(kubeConfig), stop, osmNamespace, osmMeshConfigName, msgBroker) + smiTrafficSplitClientSet := smiTrafficSplitClient.NewForConfigOrDie(kubeConfig) + smiTrafficSpecClientSet := smiTrafficSpecClient.NewForConfigOrDie(kubeConfig) + smiTrafficTargetClientSet := smiAccessClient.NewForConfigOrDie(kubeConfig) + + informerCollection, err := informers.NewInformerCollection(meshName, stop, + informers.WithKubeClient(kubeClient), + informers.WithSMIClients(smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet), + informers.WithConfigClient(configClient), + informers.WithPolicyClient(policyClient), + ) if err != nil { - events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io") + events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating informer collection") } - k8sClient, err := k8s.NewKubernetesController(kubeClient, policyClient, meshName, stop, msgBroker) + // This component will be watching resources in the config.openservicemesh.io API group + cfg, err := configurator.NewConfigurator(configClient, stop, osmNamespace, osmMeshConfigName, msgBroker) if err != nil { - events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating Kubernetes Controller") + events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io") } + k8sClient := k8s.NewKubernetesController(informerCollection, policyClient, msgBroker) + meshSpec, err := smi.NewMeshSpecClient(kubeConfig, osmNamespace, k8sClient, stop, msgBroker) if err != nil { events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating MeshSpec") @@ -215,16 +231,16 @@ func main() { } } - var configClient config.Controller + var multiclusterConfigClient config.Controller if cfg.GetFeatureFlags().EnableMulticlusterMode { - if configClient, err = config.NewConfigController(kubeConfig, k8sClient, stop, msgBroker); err != nil { + if multiclusterConfigClient, err = config.NewConfigController(kubeConfig, k8sClient, stop, msgBroker); err != nil { events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating Kubernetes config client") } } - // A nil configClient is passed in if multi cluster mode is not enabled. - kubeProvider := kube.NewClient(k8sClient, configClient, cfg) + // A nil multiclusterConfigClient is passed in if multi cluster mode is not enabled. + kubeProvider := kube.NewClient(k8sClient, multiclusterConfigClient, cfg) endpointsProviders := []endpoint.Provider{kubeProvider} serviceProviders := []service.Provider{kubeProvider} diff --git a/cmd/osm-injector/osm-injector.go b/cmd/osm-injector/osm-injector.go index 3a109080e6..2df4bd83a8 100644 --- a/cmd/osm-injector/osm-injector.go +++ b/cmd/osm-injector/osm-injector.go @@ -11,6 +11,9 @@ import ( "time" "github.com/pkg/errors" + smiAccessClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/clientset/versioned" + smiTrafficSpecClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/clientset/versioned" + smiTrafficSplitClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned" "github.com/spf13/pflag" admissionv1 "k8s.io/api/admissionregistration/v1" corev1 "k8s.io/api/core/v1" @@ -32,6 +35,7 @@ import ( "github.com/openservicemesh/osm/pkg/injector" "github.com/openservicemesh/osm/pkg/k8s" "github.com/openservicemesh/osm/pkg/k8s/events" + "github.com/openservicemesh/osm/pkg/k8s/informers" "github.com/openservicemesh/osm/pkg/logger" "github.com/openservicemesh/osm/pkg/messaging" "github.com/openservicemesh/osm/pkg/metricsstore" @@ -139,6 +143,7 @@ func main() { } kubeClient := kubernetes.NewForConfigOrDie(kubeConfig) policyClient := policyClientset.NewForConfigOrDie(kubeConfig) + configClient := configClientset.NewForConfigOrDie(kubeConfig) // Initialize the generic Kubernetes event recorder and associate it with the osm-injector pod resource injectorPod, err := getInjectorPod(kubeClient) @@ -172,6 +177,21 @@ func main() { msgBroker := messaging.NewBroker(stop) + smiTrafficSplitClientSet := smiTrafficSplitClient.NewForConfigOrDie(kubeConfig) + smiTrafficSpecClientSet := smiTrafficSpecClient.NewForConfigOrDie(kubeConfig) + smiTrafficTargetClientSet := smiAccessClient.NewForConfigOrDie(kubeConfig) + + informerCollection, err := informers.NewInformerCollection(meshName, stop, + informers.WithKubeClient(kubeClient), + informers.WithSMIClients(smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet), + informers.WithConfigClient(configClient), + informers.WithPolicyClient(policyClient), + ) + + if err != nil { + events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating informer collection") + } + // Initialize Configurator to watch resources in the config.openservicemesh.io API group cfg, err := configurator.NewConfigurator(configClientset.NewForConfigOrDie(kubeConfig), stop, osmNamespace, osmMeshConfigName, msgBroker) if err != nil { @@ -179,10 +199,7 @@ func main() { } // Initialize kubernetes.Controller to watch kubernetes resources - kubeController, err := k8s.NewKubernetesController(kubeClient, policyClient, meshName, stop, msgBroker, k8s.Namespaces) - if err != nil { - events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating Kubernetes Controller") - } + kubeController := k8s.NewKubernetesController(informerCollection, policyClient, msgBroker, k8s.Namespaces) certOpts, err := getCertOptions() if err != nil { diff --git a/pkg/k8s/client.go b/pkg/k8s/client.go index 424aa09395..ea67a7970a 100644 --- a/pkg/k8s/client.go +++ b/pkg/k8s/client.go @@ -11,40 +11,33 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "k8s.io/utils/pointer" + "github.com/openservicemesh/osm/pkg/announcements" policyv1alpha1 "github.com/openservicemesh/osm/pkg/apis/policy/v1alpha1" "github.com/openservicemesh/osm/pkg/envoy" "github.com/openservicemesh/osm/pkg/errcode" policyv1alpha1Client "github.com/openservicemesh/osm/pkg/gen/client/policy/clientset/versioned" "github.com/openservicemesh/osm/pkg/messaging" - "github.com/openservicemesh/osm/pkg/announcements" "github.com/openservicemesh/osm/pkg/constants" "github.com/openservicemesh/osm/pkg/identity" + osminformers "github.com/openservicemesh/osm/pkg/k8s/informers" "github.com/openservicemesh/osm/pkg/service" ) // NewKubernetesController returns a new kubernetes.Controller which means to provide access to locally-cached k8s resources -func NewKubernetesController(kubeClient kubernetes.Interface, policyClient policyv1alpha1Client.Interface, meshName string, - stop <-chan struct{}, msgBroker *messaging.Broker, selectInformers ...InformerKey) (Controller, error) { - return newClient(kubeClient, policyClient, meshName, stop, msgBroker, selectInformers...) +func NewKubernetesController(informerCollection *osminformers.InformerCollection, policyClient policyv1alpha1Client.Interface, msgBroker *messaging.Broker, selectInformers ...InformerKey) Controller { + return newClient(informerCollection, policyClient, msgBroker, selectInformers...) } -func newClient(kubeClient kubernetes.Interface, policyClient policyv1alpha1Client.Interface, meshName string, - stop <-chan struct{}, msgBroker *messaging.Broker, selectInformers ...InformerKey) (*client, error) { +func newClient(informerCollection *osminformers.InformerCollection, policyClient policyv1alpha1Client.Interface, msgBroker *messaging.Broker, selectInformers ...InformerKey) *client { // Initialize client object c := &client{ - kubeClient: kubeClient, - policyClient: policyClient, - meshName: meshName, - informers: informerCollection{}, + informers: informerCollection, msgBroker: msgBroker, + policyClient: policyClient, } // Initialize informers @@ -65,35 +58,18 @@ func newClient(kubeClient kubernetes.Interface, policyClient policyv1alpha1Clien informerInitHandlerMap[informer]() } - if err := c.run(stop); err != nil { - log.Error().Err(err).Msg("Could not start Kubernetes Namespaces client") - return nil, err - } - - return c, nil + return c } // Initializes Namespace monitoring func (c *client) initNamespaceMonitor() { - monitorNamespaceLabel := map[string]string{constants.OSMKubeResourceMonitorAnnotation: c.meshName} - - labelSelector := fields.SelectorFromSet(monitorNamespaceLabel).String() - option := informers.WithTweakListOptions(func(opt *metav1.ListOptions) { - opt.LabelSelector = labelSelector - }) - - informerFactory := informers.NewSharedInformerFactoryWithOptions(c.kubeClient, DefaultKubeEventResyncInterval, option) - - // Add informer - c.informers[Namespaces] = informerFactory.Core().V1().Namespaces().Informer() - // Add event handler to informer nsEventTypes := EventTypes{ Add: announcements.NamespaceAdded, Update: announcements.NamespaceUpdated, Delete: announcements.NamespaceDeleted, } - c.informers[Namespaces].AddEventHandler(GetEventHandlerFuncs(nil, nsEventTypes, c.msgBroker)) + c.informers.AddEventHandler(osminformers.InformerKeyNamespace, GetEventHandlerFuncs(nil, nsEventTypes, c.msgBroker)) } // Function to filter K8s meta Objects by OSM's isMonitoredNamespace @@ -107,94 +83,52 @@ func (c *client) shouldObserve(obj interface{}) bool { // Initializes Service monitoring func (c *client) initServicesMonitor() { - informerFactory := informers.NewSharedInformerFactory(c.kubeClient, DefaultKubeEventResyncInterval) - c.informers[Services] = informerFactory.Core().V1().Services().Informer() - svcEventTypes := EventTypes{ Add: announcements.ServiceAdded, Update: announcements.ServiceUpdated, Delete: announcements.ServiceDeleted, } - c.informers[Services].AddEventHandler(GetEventHandlerFuncs(c.shouldObserve, svcEventTypes, c.msgBroker)) + c.informers.AddEventHandler(osminformers.InformerKeyService, GetEventHandlerFuncs(c.shouldObserve, svcEventTypes, c.msgBroker)) } // Initializes Service Account monitoring func (c *client) initServiceAccountsMonitor() { - informerFactory := informers.NewSharedInformerFactory(c.kubeClient, DefaultKubeEventResyncInterval) - c.informers[ServiceAccounts] = informerFactory.Core().V1().ServiceAccounts().Informer() - svcEventTypes := EventTypes{ Add: announcements.ServiceAccountAdded, Update: announcements.ServiceAccountUpdated, Delete: announcements.ServiceAccountDeleted, } - c.informers[ServiceAccounts].AddEventHandler(GetEventHandlerFuncs(c.shouldObserve, svcEventTypes, c.msgBroker)) + c.informers.AddEventHandler(osminformers.InformerKeyServiceAccount, GetEventHandlerFuncs(c.shouldObserve, svcEventTypes, c.msgBroker)) } func (c *client) initPodMonitor() { - informerFactory := informers.NewSharedInformerFactory(c.kubeClient, DefaultKubeEventResyncInterval) - c.informers[Pods] = informerFactory.Core().V1().Pods().Informer() - podEventTypes := EventTypes{ Add: announcements.PodAdded, Update: announcements.PodUpdated, Delete: announcements.PodDeleted, } - c.informers[Pods].AddEventHandler(GetEventHandlerFuncs(c.shouldObserve, podEventTypes, c.msgBroker)) + c.informers.AddEventHandler(osminformers.InformerKeyPod, GetEventHandlerFuncs(c.shouldObserve, podEventTypes, c.msgBroker)) } func (c *client) initEndpointMonitor() { - informerFactory := informers.NewSharedInformerFactory(c.kubeClient, DefaultKubeEventResyncInterval) - c.informers[Endpoints] = informerFactory.Core().V1().Endpoints().Informer() - eptEventTypes := EventTypes{ Add: announcements.EndpointAdded, Update: announcements.EndpointUpdated, Delete: announcements.EndpointDeleted, } - c.informers[Endpoints].AddEventHandler(GetEventHandlerFuncs(c.shouldObserve, eptEventTypes, c.msgBroker)) -} - -func (c *client) run(stop <-chan struct{}) error { - log.Info().Msg("Namespace controller client started") - var hasSynced []cache.InformerSynced - var names []string - - if c.informers == nil { - return errInitInformers - } - - for name, informer := range c.informers { - if informer == nil { - continue - } - - go informer.Run(stop) - names = append(names, (string)(name)) - log.Info().Msgf("Waiting for %s informer cache sync...", name) - hasSynced = append(hasSynced, informer.HasSynced) - } - - if !cache.WaitForCacheSync(stop, hasSynced...) { - return errSyncingCaches - } - - log.Info().Msgf("Caches for %v synced successfully", names) - - return nil + c.informers.AddEventHandler(osminformers.InformerKeyEndpoints, GetEventHandlerFuncs(c.shouldObserve, eptEventTypes, c.msgBroker)) } // IsMonitoredNamespace returns a boolean indicating if the namespace is among the list of monitored namespaces func (c client) IsMonitoredNamespace(namespace string) bool { - _, exists, _ := c.informers[Namespaces].GetStore().GetByKey(namespace) - return exists + return c.informers.IsMonitoredNamespace(namespace) } // ListMonitoredNamespaces returns all namespaces that the mesh is monitoring. func (c client) ListMonitoredNamespaces() ([]string, error) { var namespaces []string - for _, ns := range c.informers[Namespaces].GetStore().List() { + for _, ns := range c.informers.List(osminformers.InformerKeyNamespace) { namespace, ok := ns.(*corev1.Namespace) if !ok { log.Error().Err(errListingNamespaces).Msg("Failed to list monitored namespaces") @@ -208,7 +142,7 @@ func (c client) ListMonitoredNamespaces() ([]string, error) { // GetService retrieves the Kubernetes Services resource for the given MeshService func (c client) GetService(svc service.MeshService) *corev1.Service { // client-go cache uses / as key - svcIf, exists, err := c.informers[Services].GetStore().GetByKey(svc.NamespacedKey()) + svcIf, exists, err := c.informers.GetByKey(osminformers.InformerKeyService, svc.NamespacedKey()) if exists && err == nil { svc := svcIf.(*corev1.Service) return svc @@ -220,7 +154,7 @@ func (c client) GetService(svc service.MeshService) *corev1.Service { func (c client) ListServices() []*corev1.Service { var services []*corev1.Service - for _, serviceInterface := range c.informers[Services].GetStore().List() { + for _, serviceInterface := range c.informers.List(osminformers.InformerKeyService) { svc := serviceInterface.(*corev1.Service) if !c.IsMonitoredNamespace(svc.Namespace) { @@ -235,7 +169,7 @@ func (c client) ListServices() []*corev1.Service { func (c client) ListServiceAccounts() []*corev1.ServiceAccount { var serviceAccounts []*corev1.ServiceAccount - for _, serviceInterface := range c.informers[ServiceAccounts].GetStore().List() { + for _, serviceInterface := range c.informers.List(osminformers.InformerKeyServiceAccount) { sa := serviceInterface.(*corev1.ServiceAccount) if !c.IsMonitoredNamespace(sa.Namespace) { @@ -248,7 +182,7 @@ func (c client) ListServiceAccounts() []*corev1.ServiceAccount { // GetNamespace returns a Namespace resource if found, nil otherwise. func (c client) GetNamespace(ns string) *corev1.Namespace { - nsIf, exists, err := c.informers[Namespaces].GetStore().GetByKey(ns) + nsIf, exists, err := c.informers.GetByKey(osminformers.InformerKeyNamespace, ns) if exists && err == nil { ns := nsIf.(*corev1.Namespace) return ns @@ -262,7 +196,7 @@ func (c client) GetNamespace(ns string) *corev1.Namespace { func (c client) ListPods() []*corev1.Pod { var pods []*corev1.Pod - for _, podInterface := range c.informers[Pods].GetStore().List() { + for _, podInterface := range c.informers.List(osminformers.InformerKeyPod) { pod := podInterface.(*corev1.Pod) if !c.IsMonitoredNamespace(pod.Namespace) { continue @@ -275,7 +209,7 @@ func (c client) ListPods() []*corev1.Pod { // GetEndpoints returns the endpoint for a given service, otherwise returns nil if not found // or error if the API errored out. func (c client) GetEndpoints(svc service.MeshService) (*corev1.Endpoints, error) { - ep, exists, err := c.informers[Endpoints].GetStore().GetByKey(svc.NamespacedKey()) + ep, exists, err := c.informers.GetByKey(osminformers.InformerKeyEndpoints, svc.NamespacedKey()) if err != nil { return nil, err } diff --git a/pkg/k8s/client_test.go b/pkg/k8s/client_test.go index 6fb81b49ee..2f99247797 100644 --- a/pkg/k8s/client_test.go +++ b/pkg/k8s/client_test.go @@ -1,11 +1,11 @@ package k8s import ( + "fmt" "testing" "github.com/golang/mock/gomock" "github.com/google/uuid" - "github.com/stretchr/testify/assert" tassert "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" @@ -13,6 +13,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" testclient "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" "k8s.io/utils/pointer" policyv1alpha1 "github.com/openservicemesh/osm/pkg/apis/policy/v1alpha1" @@ -23,6 +24,7 @@ import ( "github.com/openservicemesh/osm/pkg/constants" "github.com/openservicemesh/osm/pkg/identity" + "github.com/openservicemesh/osm/pkg/k8s/informers" "github.com/openservicemesh/osm/pkg/service" ) @@ -30,6 +32,44 @@ var ( testMeshName = "mesh" ) +func newFakeCacheStore(store map[string]interface{}, keyFunc func(obj interface{}) string) cache.Store { + return &cache.FakeCustomStore{ + AddFunc: func(obj interface{}) error { store[keyFunc(obj)] = obj; return nil }, + UpdateFunc: func(obj interface{}) error { store[keyFunc(obj)] = obj; return nil }, + DeleteFunc: func(obj interface{}) error { delete(store, keyFunc(obj)); return nil }, + ListFunc: func() []interface{} { + var objs []interface{} + for _, obj := range store { + objs = append(objs, obj) + } + return objs + }, + ListKeysFunc: func() []string { + var keys []string + for key := range store { + keys = append(keys, key) + } + return keys + }, + GetFunc: func(obj interface{}) (item interface{}, exists bool, err error) { + item, ok := store[keyFunc(obj)] + if !ok { + return nil, false, nil + } + return item, true, nil + }, + GetByKeyFunc: func(key string) (item interface{}, exists bool, err error) { + item, ok := store[key] + if !ok { + return nil, false, nil + } + return item, true, nil + }, + ResyncFunc: func() error { return nil }, + ReplaceFunc: func(list []interface{}, resourceVersion string) error { return nil }, + } +} + func TestIsMonitoredNamespace(t *testing.T) { testCases := []struct { name string @@ -59,12 +99,24 @@ func TestIsMonitoredNamespace(t *testing.T) { }, } + namespaceKeyFunc := func(obj interface{}) string { + ns := obj.(*corev1.Namespace) + return ns.GetName() + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + namespaceStore := map[string]interface{}{} + store := newFakeCacheStore(namespaceStore, namespaceKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyNamespace: store, + } + + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) - _ = c.informers[Namespaces].GetStore().Add(tc.namespace) + c := newClient(ic, nil, nil) + _ = store.Add(tc.namespace) actual := c.IsMonitoredNamespace(tc.ns) a.Equal(tc.expected, actual) @@ -101,12 +153,23 @@ func TestGetNamespace(t *testing.T) { }, } + namespaceKeyFunc := func(obj interface{}) string { + ns := obj.(*corev1.Namespace) + return ns.GetName() + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + namespaceStore := map[string]interface{}{} + store := newFakeCacheStore(namespaceStore, namespaceKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyNamespace: store, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) - _ = c.informers[Namespaces].GetStore().Add(tc.namespace) + c := newClient(ic, nil, nil) + _ = store.Add(tc.namespace) actual := c.GetNamespace(tc.ns) if tc.expected { @@ -147,13 +210,24 @@ func TestListMonitoredNamespaces(t *testing.T) { }, } + namespaceKeyFunc := func(obj interface{}) string { + ns := obj.(*corev1.Namespace) + return ns.GetName() + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + namespaceStore := map[string]interface{}{} + store := newFakeCacheStore(namespaceStore, namespaceKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyNamespace: store, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) + c := newClient(ic, nil, nil) for _, ns := range tc.namespaces { - _ = c.informers[Namespaces].GetStore().Add(ns) + _ = store.Add(ns) } actual, err := c.ListMonitoredNamespaces() @@ -205,12 +279,23 @@ func TestGetService(t *testing.T) { }, } + serviceKeyFunc := func(obj interface{}) string { + svc := obj.(*corev1.Service) + return fmt.Sprintf("%s/%s", svc.GetNamespace(), svc.GetName()) + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + serviceStore := map[string]interface{}{} + store := newFakeCacheStore(serviceStore, serviceKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyService: store, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) - _ = c.informers[Services].GetStore().Add(tc.service) + c := newClient(ic, nil, nil) + _ = store.Add(tc.service) actual := c.GetService(tc.svc) if tc.expected { @@ -261,15 +346,34 @@ func TestListServices(t *testing.T) { }, } + serviceKeyFunc := func(obj interface{}) string { + svc := obj.(*corev1.Service) + return fmt.Sprintf("%s/%s", svc.GetNamespace(), svc.GetName()) + } + + namespaceKeyFunc := func(obj interface{}) string { + ns := obj.(*corev1.Namespace) + return ns.GetName() + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + serviceStore := map[string]interface{}{} + namespaceStore := map[string]interface{}{} + nsStore := newFakeCacheStore(namespaceStore, namespaceKeyFunc) + svcStore := newFakeCacheStore(serviceStore, serviceKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyNamespace: nsStore, + informers.InformerKeyService: svcStore, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) - _ = c.informers[Namespaces].GetStore().Add(tc.namespace) + c := newClient(ic, nil, nil) + _ = nsStore.Add(tc.namespace) for _, s := range tc.services { - _ = c.informers[Services].GetStore().Add(s) + _ = svcStore.Add(s) } actual := c.ListServices() @@ -317,15 +421,34 @@ func TestListServiceAccounts(t *testing.T) { }, } + serviceAccountKeyFunc := func(obj interface{}) string { + svcAccount := obj.(*corev1.ServiceAccount) + return fmt.Sprintf("%s/%s", svcAccount.GetNamespace(), svcAccount.GetName()) + } + + namespaceKeyFunc := func(obj interface{}) string { + ns := obj.(*corev1.Namespace) + return ns.GetName() + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + serviceStore := map[string]interface{}{} + namespaceStore := map[string]interface{}{} + nsStore := newFakeCacheStore(namespaceStore, namespaceKeyFunc) + svcAccountStore := newFakeCacheStore(serviceStore, serviceAccountKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyNamespace: nsStore, + informers.InformerKeyServiceAccount: svcAccountStore, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) - _ = c.informers[Namespaces].GetStore().Add(tc.namespace) + c := newClient(ic, nil, nil) + _ = nsStore.Add(tc.namespace) for _, s := range tc.sa { - _ = c.informers[ServiceAccounts].GetStore().Add(s) + _ = svcAccountStore.Add(s) } actual := c.ListServiceAccounts() @@ -373,15 +496,34 @@ func TestListPods(t *testing.T) { }, } + podKeyFunc := func(obj interface{}) string { + pod := obj.(*corev1.Pod) + return fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName()) + } + + namespaceKeyFunc := func(obj interface{}) string { + ns := obj.(*corev1.Namespace) + return ns.GetName() + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + podStore := map[string]interface{}{} + namespaceStore := map[string]interface{}{} + nsStore := newFakeCacheStore(namespaceStore, namespaceKeyFunc) + pStore := newFakeCacheStore(podStore, podKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyNamespace: nsStore, + informers.InformerKeyPod: pStore, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) - _ = c.informers[Namespaces].GetStore().Add(tc.namespace) + c := newClient(ic, nil, nil) + _ = nsStore.Add(tc.namespace) - for _, s := range tc.pods { - _ = c.informers[Pods].GetStore().Add(s) + for _, p := range tc.pods { + _ = pStore.Add(p) } actual := c.ListPods() @@ -426,12 +568,23 @@ func TestGetEndpoints(t *testing.T) { }, } + endpointsKeyFunc := func(obj interface{}) string { + epts := obj.(*corev1.Endpoints) + return fmt.Sprintf("%s/%s", epts.GetNamespace(), epts.GetName()) + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + endpointsStore := map[string]interface{}{} + eptsStore := newFakeCacheStore(endpointsStore, endpointsKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyEndpoints: eptsStore, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) - _ = c.informers[Endpoints].GetStore().Add(tc.endpoints) + c := newClient(ic, nil, nil) + _ = eptsStore.Add(tc.endpoints) actual, err := c.GetEndpoints(tc.svc) a.Nil(err) @@ -524,16 +677,43 @@ func TestListServiceIdentitiesForService(t *testing.T) { }, } + namespaceKeyFunc := func(obj interface{}) string { + ns := obj.(*corev1.Namespace) + return ns.GetName() + } + + podKeyFunc := func(obj interface{}) string { + pod := obj.(*corev1.Pod) + return fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName()) + } + + svcKeyFunc := func(obj interface{}) string { + svc := obj.(*corev1.Service) + return fmt.Sprintf("%s/%s", svc.GetNamespace(), svc.GetName()) + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - a := assert.New(t) - c, err := newClient(testclient.NewSimpleClientset(), nil, testMeshName, nil, nil) + a := tassert.New(t) + podStore := map[string]interface{}{} + namespaceStore := map[string]interface{}{} + serviceStore := map[string]interface{}{} + nsStore := newFakeCacheStore(namespaceStore, namespaceKeyFunc) + pStore := newFakeCacheStore(podStore, podKeyFunc) + svcStore := newFakeCacheStore(serviceStore, svcKeyFunc) + stores := map[informers.InformerKey]cache.Store{ + informers.InformerKeyNamespace: nsStore, + informers.InformerKeyPod: pStore, + informers.InformerKeyService: svcStore, + } + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(testclient.NewSimpleClientset()), informers.WithCustomStores(stores)) a.Nil(err) - _ = c.informers[Namespaces].GetStore().Add(tc.namespace) + c := newClient(ic, nil, nil) + _ = nsStore.Add(tc.namespace) for _, p := range tc.pods { - _ = c.informers[Pods].GetStore().Add(p) + _ = pStore.Add(p) } - _ = c.informers[Services].GetStore().Add(tc.service) + _ = svcStore.Add(tc.service) actual, err := c.ListServiceIdentitiesForService(tc.svc) a.Equal(tc.expectErr, err != nil) @@ -700,10 +880,9 @@ func TestUpdateStatus(t *testing.T) { a := tassert.New(t) kubeClient := testclient.NewSimpleClientset() policyClient := fakePolicyClient.NewSimpleClientset(tc.existingResource.(runtime.Object)) - - c, err := NewKubernetesController(kubeClient, policyClient, testMeshName, make(chan struct{}), nil) + ic, err := informers.NewInformerCollection(testMeshName, nil, informers.WithKubeClient(kubeClient), informers.WithPolicyClient(policyClient)) a.Nil(err) - + c := NewKubernetesController(ic, policyClient, nil) _, err = c.UpdateStatus(tc.updatedResource) a.Equal(tc.expectErr, err != nil) }) @@ -1020,8 +1199,10 @@ func TestK8sServicesToMeshServices(t *testing.T) { fakeClient := testclient.NewSimpleClientset(tc.svcEndpoints...) stop := make(chan struct{}) - kubeController, err := NewKubernetesController(fakeClient, nil, testMeshName, stop, nil) + ic, err := informers.NewInformerCollection(testMeshName, stop, informers.WithKubeClient(fakeClient)) assert.Nil(err) + + kubeController := NewKubernetesController(ic, nil, nil) assert.NotNil(kubeController) actual := ServiceToMeshServices(kubeController, tc.svc) @@ -1056,9 +1237,11 @@ func TestGetPodForProxy(t *testing.T) { tests.NewPodFixture(namespace, "pod-2", tests.BookstoreServiceAccountName, someOthePodLabels), ) - kubeController, err := NewKubernetesController(kubeClient, nil, testMeshName, stop, messaging.NewBroker(nil)) + ic, err := informers.NewInformerCollection(testMeshName, stop, informers.WithKubeClient(kubeClient)) assert.Nil(err) + kubeController := NewKubernetesController(ic, nil, messaging.NewBroker(nil)) + testCases := []struct { name string pod *corev1.Pod diff --git a/pkg/k8s/errors.go b/pkg/k8s/errors.go index 3c0b520c83..b170b32e90 100644 --- a/pkg/k8s/errors.go +++ b/pkg/k8s/errors.go @@ -3,8 +3,6 @@ package k8s import "github.com/pkg/errors" var ( - errSyncingCaches = errors.New("Failed initial cache sync for Namespace informers") - errInitInformers = errors.New("Informer not initialized") errListingNamespaces = errors.New("Failed to list monitored namespaces") errServiceNotFound = errors.New("Service not found") diff --git a/pkg/k8s/informers/informers.go b/pkg/k8s/informers/informers.go new file mode 100644 index 0000000000..4b44f889e5 --- /dev/null +++ b/pkg/k8s/informers/informers.go @@ -0,0 +1,241 @@ +package informers + +import ( + "github.com/rs/zerolog/log" + smiTrafficAccessClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/clientset/versioned" + smiTrafficSpecClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/clientset/versioned" + smiTrafficSplitClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + configClientset "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned" + policyClientset "github.com/openservicemesh/osm/pkg/gen/client/policy/clientset/versioned" +) + +var ( + k8sInformerKeys = []InformerKey{ + InformerKeyNamespace, + InformerKeyService, + InformerKeyServiceAccount, + InformerKeyPod, + InformerKeyEndpoints, + } + + smiInformerKeys = []InformerKey{ + InformerKeyTrafficSplit, + InformerKeyTrafficTarget, + InformerKeyHTTPRouteGroup, + InformerKeyTCPRoute, + } + + configInformerKeys = []InformerKey{ + InformerKeyMeshConfig, + InformerKeyMeshRootCertificate, + } + + policyInformerKeys = []InformerKey{ + InformerKeyEgress, + InformerKeyIngressBackend, + InformerKeyUpstreamTrafficSetting, + InformerKeyRetry, + } +) + +// InformerCollectionOption is a function that modifies an informer collection +type InformerCollectionOption func(*InformerCollection) + +// NewInformerCollection creates a new InformerCollection +func NewInformerCollection(meshName string, stop <-chan struct{}, opts ...InformerCollectionOption) (*InformerCollection, error) { + ic := &InformerCollection{ + meshName: meshName, + informers: map[InformerKey]*informer{}, + selectedInformers: map[InformerKey]struct{}{}, + } + + // Execute all of the given options (e.g. set clients, set custom stores, etc.) + for _, opt := range opts { + if opt != nil { + opt(ic) + } + } + + informerInitHandlerMap := map[InformerKey]informerInit{ + // Kubernetes + InformerKeyNamespace: ic.initNamespaceMonitor, + InformerKeyService: ic.initServicesMonitor, + InformerKeyServiceAccount: ic.initServiceAccountsMonitor, + InformerKeyPod: ic.initPodMonitor, + InformerKeyEndpoints: ic.initEndpointMonitor, + + // SMI + InformerKeyTrafficSplit: ic.initTrafficSplitMonitor, + InformerKeyTrafficTarget: ic.initTrafficTargetMonitor, + InformerKeyHTTPRouteGroup: ic.initHTTPRouteGroupMonitor, + InformerKeyTCPRoute: ic.initTCPRouteMonitor, + + // Config + InformerKeyMeshConfig: ic.initMeshConfigMonitor, + InformerKeyMeshRootCertificate: ic.initMeshRootCertificateMonitor, + + // Policy + InformerKeyEgress: ic.initEgressMonitor, + InformerKeyIngressBackend: ic.initIngressBackendMonitor, + InformerKeyUpstreamTrafficSetting: ic.initUpstreamTrafficSettingMonitor, + InformerKeyRetry: ic.initRetryMonitor, + } + + if len(ic.selectedInformers) == 0 { + // Select all informers + ic.selectedInformers = map[InformerKey]struct{}{ + InformerKeyNamespace: {}, + InformerKeyService: {}, + InformerKeyPod: {}, + InformerKeyEndpoints: {}, + InformerKeyServiceAccount: {}, + InformerKeyTrafficSplit: {}, + InformerKeyTrafficTarget: {}, + InformerKeyHTTPRouteGroup: {}, + InformerKeyTCPRoute: {}, + InformerKeyMeshConfig: {}, + InformerKeyMeshRootCertificate: {}, + InformerKeyEgress: {}, + InformerKeyIngressBackend: {}, + InformerKeyUpstreamTrafficSetting: {}, + InformerKeyRetry: {}, + } + } + + for key := range ic.selectedInformers { + informerInitHandlerMap[key]() + } + + if err := ic.run(stop); err != nil { + log.Error().Err(err).Msg("Could not start informer collection") + return nil, err + } + + return ic, nil +} + +// WithCustomStores provides the InformerCollection an set of `cache.Store`s indexed +// by InformerKey. This functionality was added for the express purpose of testing +// flexibility since the alternative often leads to flaky tests and race conditions +// between the time an object is added to a fake clientset and when that object +// is actually added to the informer `cache.Store`. +func WithCustomStores(stores map[InformerKey]cache.Store) InformerCollectionOption { + return func(ic *InformerCollection) { + ic.customStores = stores + } +} + +// WithKubeClient sets the kubeClient for the InformerCollection +func WithKubeClient(kubeClient kubernetes.Interface) InformerCollectionOption { + return func(ic *InformerCollection) { + ic.kubeClient = kubeClient + + // select the k8s informers + for _, key := range k8sInformerKeys { + ic.selectedInformers[key] = struct{}{} + } + } +} + +// WithSMIClients sets the SMI clients for the InformerCollection +func WithSMIClients(smiTrafficSplitClient smiTrafficSplitClient.Interface, smiTrafficSpecClient smiTrafficSpecClient.Interface, smiAccessClient smiTrafficAccessClient.Interface) InformerCollectionOption { + return func(ic *InformerCollection) { + ic.smiTrafficSplitClient = smiTrafficSplitClient + ic.smiTrafficSpecClient = smiTrafficSpecClient + ic.smiAccessClient = smiAccessClient + + // select the SMI informers + for _, key := range smiInformerKeys { + ic.selectedInformers[key] = struct{}{} + } + } +} + +// WithConfigClient sets the config client for the InformerCollection +func WithConfigClient(configClient configClientset.Interface) InformerCollectionOption { + return func(ic *InformerCollection) { + ic.configClient = configClient + + // select the config informers + for _, key := range configInformerKeys { + ic.selectedInformers[key] = struct{}{} + } + } +} + +// WithPolicyClient sets the policy client for the InformerCollection +func WithPolicyClient(policyClient policyClientset.Interface) InformerCollectionOption { + return func(ic *InformerCollection) { + ic.policyClient = policyClient + + // select the policy informers + for _, key := range policyInformerKeys { + ic.selectedInformers[key] = struct{}{} + } + } +} + +func (ic *InformerCollection) run(stop <-chan struct{}) error { + log.Info().Msg("InformerCollection started") + var hasSynced []cache.InformerSynced + var names []string + + if ic.informers == nil { + return errInitInformers + } + + for name, informer := range ic.informers { + if informer == nil { + continue + } + + go informer.Run(stop) + names = append(names, string(name)) + log.Info().Msgf("Waiting for %s informer cache sync...", name) + hasSynced = append(hasSynced, informer.HasSynced) + } + + if !cache.WaitForCacheSync(stop, hasSynced...) { + return errSyncingCaches + } + + log.Info().Msgf("Caches for %v synced successfully", names) + + return nil +} + +// AddEventHandler adds an handler to the informer indexed by the given InformerKey +func (ic *InformerCollection) AddEventHandler(informerKey InformerKey, handler cache.ResourceEventHandler) { + i, ok := ic.informers[informerKey] + if !ok { + log.Info().Msgf("attempted to add event handler for nil informer %s", informerKey) + return + } + + i.informer.AddEventHandler(handler) +} + +// GetByKey retrieves an item (based on the given index) from the store of the informer indexed by the given InformerKey +func (ic *InformerCollection) GetByKey(informerKey InformerKey, objectKey string) (interface{}, bool, error) { + informer, ok := ic.informers[informerKey] + if !ok { + // keithmattix: This is the silent failure option, but perhaps we want to return an error? + return nil, false, nil + } + + return informer.GetStore().GetByKey(objectKey) +} + +// List returns the contents of the store of the informer indexed by the given InformerKey +func (ic *InformerCollection) List(informerKey InformerKey) []interface{} { + informer, ok := ic.informers[informerKey] + if !ok { + // keithmattix: This is the silent failure option, but perhaps we want to return an error? + return nil + } + + return informer.GetStore().List() +} diff --git a/pkg/k8s/informers/init.go b/pkg/k8s/informers/init.go new file mode 100644 index 0000000000..af5c7ca3f9 --- /dev/null +++ b/pkg/k8s/informers/init.go @@ -0,0 +1,229 @@ +package informers + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/informers" + + smiAccessInformers "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/informers/externalversions" + smiTrafficSpecInformers "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/informers/externalversions" + smiTrafficSplitInformers "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/informers/externalversions" + + configInformers "github.com/openservicemesh/osm/pkg/gen/client/config/informers/externalversions" + policyInformers "github.com/openservicemesh/osm/pkg/gen/client/policy/informers/externalversions" + + "github.com/openservicemesh/osm/pkg/constants" +) + +// IsMonitoredNamespace returns a boolean indicating if the namespace is among the list of monitored namespaces +func (ic InformerCollection) IsMonitoredNamespace(namespace string) bool { + _, exists, _ := ic.informers[InformerKeyNamespace].GetStore().GetByKey(namespace) + return exists +} + +// Initializes Namespace monitoring +func (ic *InformerCollection) initNamespaceMonitor() { + monitorNamespaceLabel := map[string]string{constants.OSMKubeResourceMonitorAnnotation: ic.meshName} + + labelSelector := fields.SelectorFromSet(monitorNamespaceLabel).String() + option := informers.WithTweakListOptions(func(opt *metav1.ListOptions) { + opt.LabelSelector = labelSelector + }) + + informerFactory := informers.NewSharedInformerFactoryWithOptions(ic.kubeClient, DefaultKubeEventResyncInterval, option) + + // Add informer + informer := &informer{ + informer: informerFactory.Core().V1().Namespaces().Informer(), + } + + customStore := ic.customStores[InformerKeyNamespace] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyNamespace] = informer +} + +// Initializes Service monitoring +func (ic *InformerCollection) initServicesMonitor() { + informerFactory := informers.NewSharedInformerFactory(ic.kubeClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Core().V1().Services().Informer(), + } + + customStore := ic.customStores[InformerKeyService] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyService] = informer +} + +// Initializes Service Account monitoring +func (ic *InformerCollection) initServiceAccountsMonitor() { + informerFactory := informers.NewSharedInformerFactory(ic.kubeClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Core().V1().ServiceAccounts().Informer(), + } + + customStore := ic.customStores[InformerKeyServiceAccount] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyServiceAccount] = informer +} + +func (ic *InformerCollection) initPodMonitor() { + informerFactory := informers.NewSharedInformerFactory(ic.kubeClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Core().V1().Pods().Informer(), + } + + customStore := ic.customStores[InformerKeyPod] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyPod] = informer +} + +func (ic *InformerCollection) initEndpointMonitor() { + informerFactory := informers.NewSharedInformerFactory(ic.kubeClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Core().V1().Endpoints().Informer(), + } + + customStore := ic.customStores[InformerKeyEndpoints] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyEndpoints] = informer +} + +func (ic *InformerCollection) initTrafficSplitMonitor() { + informerFactory := smiTrafficSplitInformers.NewSharedInformerFactory(ic.smiTrafficSplitClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Split().V1alpha2().TrafficSplits().Informer(), + } + + customStore := ic.customStores[InformerKeyTrafficSplit] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyTrafficSplit] = informer +} + +func (ic *InformerCollection) initTrafficTargetMonitor() { + informerFactory := smiAccessInformers.NewSharedInformerFactory(ic.smiAccessClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Access().V1alpha3().TrafficTargets().Informer(), + } + + customStore := ic.customStores[InformerKeyTrafficTarget] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyTrafficTarget] = informer +} + +func (ic *InformerCollection) initHTTPRouteGroupMonitor() { + informerFactory := smiTrafficSpecInformers.NewSharedInformerFactory(ic.smiTrafficSpecClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Specs().V1alpha4().HTTPRouteGroups().Informer(), + } + + customStore := ic.customStores[InformerKeyHTTPRouteGroup] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyHTTPRouteGroup] = informer +} + +func (ic *InformerCollection) initTCPRouteMonitor() { + informerFactory := smiTrafficSpecInformers.NewSharedInformerFactory(ic.smiTrafficSpecClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Specs().V1alpha4().TCPRoutes().Informer(), + } + + customStore := ic.customStores[InformerKeyTCPRoute] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyTCPRoute] = informer +} + +func (ic *InformerCollection) initMeshConfigMonitor() { + informerFactory := configInformers.NewSharedInformerFactory(ic.configClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Config().V1alpha2().MeshConfigs().Informer(), + } + + customStore := ic.customStores[InformerKeyMeshConfig] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyMeshConfig] = informer +} + +func (ic *InformerCollection) initMeshRootCertificateMonitor() { + informerFactory := configInformers.NewSharedInformerFactory(ic.configClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Config().V1alpha2().MeshRootCertificates().Informer(), + } + + customStore := ic.customStores[InformerKeyMeshRootCertificate] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyMeshRootCertificate] = informer +} + +func (ic *InformerCollection) initEgressMonitor() { + informerFactory := policyInformers.NewSharedInformerFactory(ic.policyClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Policy().V1alpha1().Egresses().Informer(), + } + + customStore := ic.customStores[InformerKeyEgress] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyEgress] = informer +} + +func (ic *InformerCollection) initIngressBackendMonitor() { + informerFactory := policyInformers.NewSharedInformerFactory(ic.policyClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Policy().V1alpha1().IngressBackends().Informer(), + } + + customStore := ic.customStores[InformerKeyIngressBackend] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyIngressBackend] = informer +} + +func (ic *InformerCollection) initUpstreamTrafficSettingMonitor() { + informerFactory := policyInformers.NewSharedInformerFactory(ic.policyClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Policy().V1alpha1().UpstreamTrafficSettings().Informer(), + } + + customStore := ic.customStores[InformerKeyUpstreamTrafficSetting] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyUpstreamTrafficSetting] = informer +} + +func (ic *InformerCollection) initRetryMonitor() { + informerFactory := policyInformers.NewSharedInformerFactory(ic.policyClient, DefaultKubeEventResyncInterval) + informer := &informer{ + informer: informerFactory.Policy().V1alpha1().Retries().Informer(), + } + + customStore := ic.customStores[InformerKeyRetry] + if customStore != nil { + informer.customStore = customStore + } + ic.informers[InformerKeyRetry] = informer +} diff --git a/pkg/k8s/informers/types.go b/pkg/k8s/informers/types.go new file mode 100644 index 0000000000..e93c6e5008 --- /dev/null +++ b/pkg/k8s/informers/types.go @@ -0,0 +1,106 @@ +package informers + +import ( + "errors" + "time" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + smiTrafficAccessClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/clientset/versioned" + smiTrafficSpecClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/clientset/versioned" + smiTrafficSplitClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned" + + configClientset "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned" + policyClientset "github.com/openservicemesh/osm/pkg/gen/client/policy/clientset/versioned" +) + +// InformerKey stores the different Informers we keep for K8s resources +type InformerKey string + +const ( + // InformerKeyNamespace is the InformerKey for a Namespace informer + InformerKeyNamespace InformerKey = "Namespace" + // InformerKeyService is the InformerKey for a Service informer + InformerKeyService InformerKey = "Service" + // InformerKeyPod is the InformerKey for a Pod informer + InformerKeyPod InformerKey = "Pod" + // InformerKeyEndpoints is the InformerKey for a Endpoints informer + InformerKeyEndpoints InformerKey = "Endpoints" + // InformerKeyServiceAccount is the InformerKey for a ServiceAccount informer + InformerKeyServiceAccount InformerKey = "ServiceAccount" + + // InformerKeyTrafficSplit is the InformerKey for a TrafficSplit informer + InformerKeyTrafficSplit InformerKey = "TrafficSplit" + // InformerKeyTrafficTarget is the InformerKey for a TrafficTarget informer + InformerKeyTrafficTarget InformerKey = "TrafficTarget" + // InformerKeyHTTPRouteGroup is the InformerKey for a HTTPRouteGroup informer + InformerKeyHTTPRouteGroup InformerKey = "HTTPRouteGroup" + // InformerKeyTCPRoute is the InformerKey for a TCPRoute informer + InformerKeyTCPRoute InformerKey = "TCPRoute" + + // InformerKeyMeshConfig is the InformerKey for a MeshConfig informer + InformerKeyMeshConfig InformerKey = "MeshConfig" + // InformerKeyMeshRootCertificate is the InformerKey for a MeshRootCertificate informer + InformerKeyMeshRootCertificate InformerKey = "MeshRootCertificate" + + // InformerKeyEgress is the InformerKey for a Egress informer + InformerKeyEgress InformerKey = "Egress" + // InformerKeyIngressBackend is the InformerKey for a IngressBackend informer + InformerKeyIngressBackend InformerKey = "IngressBackend" + // InformerKeyUpstreamTrafficSetting is the InformerKey for a UpstreamTrafficSetting informer + InformerKeyUpstreamTrafficSetting InformerKey = "UpstreamTrafficSetting" + // InformerKeyRetry is the InformerKey for a Retry informer + InformerKeyRetry InformerKey = "Retry" +) + +const ( + // DefaultKubeEventResyncInterval is the default resync interval for k8s events + // This is set to 0 because we do not need resyncs from k8s client, and have our + // own Ticker to turn on periodic resyncs. + DefaultKubeEventResyncInterval = 0 * time.Second +) + +var ( + errInitInformers = errors.New("informer not initialized") + errSyncingCaches = errors.New("failed initial cache sync for informers") +) + +type informer struct { + customStore cache.Store + informer cache.SharedIndexInformer +} + +// InformerCollection is an abstraction around a set of informers +// initialized with the clients stored in its fields. This data +// type should only be passed around as a pointer +type InformerCollection struct { + informers map[InformerKey]*informer + meshName string + kubeClient kubernetes.Interface + smiTrafficSplitClient smiTrafficSplitClient.Interface + smiTrafficSpecClient smiTrafficSpecClient.Interface + smiAccessClient smiTrafficAccessClient.Interface + configClient configClientset.Interface + policyClient policyClientset.Interface + selectedInformers map[InformerKey]struct{} + customStores map[InformerKey]cache.Store +} + +type informerInit func() + +func (i *informer) GetStore() cache.Store { + if i.customStore != nil { + return i.customStore + } + + return i.informer.GetStore() +} + +func (i *informer) HasSynced() bool { + return i.informer.HasSynced() +} + +func (i *informer) Run(stop <-chan struct{}) { + i.informer.Run(stop) +} diff --git a/pkg/k8s/types.go b/pkg/k8s/types.go index 9d8c0b72c4..bb55701e41 100644 --- a/pkg/k8s/types.go +++ b/pkg/k8s/types.go @@ -8,15 +8,14 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "github.com/openservicemesh/osm/pkg/envoy" policyv1alpha1Client "github.com/openservicemesh/osm/pkg/gen/client/policy/clientset/versioned" - "github.com/openservicemesh/osm/pkg/messaging" "github.com/openservicemesh/osm/pkg/identity" + "github.com/openservicemesh/osm/pkg/k8s/informers" "github.com/openservicemesh/osm/pkg/logger" + "github.com/openservicemesh/osm/pkg/messaging" "github.com/openservicemesh/osm/pkg/service" ) @@ -65,15 +64,10 @@ const ( ServiceAccounts InformerKey = "ServiceAccounts" ) -// informerCollection is the type holding the collection of informers we keep -type informerCollection map[InformerKey]cache.SharedIndexInformer - // client is the type used to represent the k8s client for the native k8s resources type client struct { - meshName string - kubeClient kubernetes.Interface policyClient policyv1alpha1Client.Interface - informers informerCollection + informers *informers.InformerCollection msgBroker *messaging.Broker } diff --git a/pkg/smi/client_test.go b/pkg/smi/client_test.go index 2be44f3651..a03d1aa81f 100644 --- a/pkg/smi/client_test.go +++ b/pkg/smi/client_test.go @@ -23,6 +23,7 @@ import ( "github.com/openservicemesh/osm/pkg/constants" "github.com/openservicemesh/osm/pkg/identity" "github.com/openservicemesh/osm/pkg/k8s" + "github.com/openservicemesh/osm/pkg/k8s/informers" "github.com/openservicemesh/osm/pkg/messaging" "github.com/openservicemesh/osm/pkg/service" "github.com/openservicemesh/osm/pkg/tests" @@ -47,11 +48,17 @@ func bootstrapClient(stop chan struct{}) (*client, *fakeKubeClientSet, error) { smiTrafficSpecClientSet := testTrafficSpecClient.NewSimpleClientset() smiTrafficTargetClientSet := testTrafficTargetClient.NewSimpleClientset() msgBroker := messaging.NewBroker(stop) - kubernetesClient, err := k8s.NewKubernetesController(kubeClient, nil, meshName, stop, msgBroker) + informerCollection, err := informers.NewInformerCollection(meshName, stop, + informers.WithKubeClient(kubeClient), + informers.WithSMIClients(smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet), + ) + if err != nil { return nil, nil, err } + kubernetesClient := k8s.NewKubernetesController(informerCollection, nil, msgBroker) + fakeClientSet := &fakeKubeClientSet{ kubeClient: kubeClient, smiTrafficSplitClientSet: smiTrafficSplitClientSet,