Skip to content
This repository was archived by the owner on Jul 11, 2023. It is now read-only.

k8s/informers: centralize informers to simplify code #4801

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions cmd/osm-controller/osm-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"time"

"github.com/pkg/errors"
smiAccessClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/clientset/versioned"
smiTrafficSpecClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/clientset/versioned"
smiTrafficSplitClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
"github.com/spf13/pflag"
admissionv1 "k8s.io/api/admissionregistration/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -43,6 +46,7 @@ import (
"github.com/openservicemesh/osm/pkg/ingress"
"github.com/openservicemesh/osm/pkg/k8s"
"github.com/openservicemesh/osm/pkg/k8s/events"
"github.com/openservicemesh/osm/pkg/k8s/informers"
"github.com/openservicemesh/osm/pkg/logger"
"github.com/openservicemesh/osm/pkg/messaging"
"github.com/openservicemesh/osm/pkg/metricsstore"
Expand Down Expand Up @@ -151,6 +155,7 @@ func main() {
}
kubeClient := kubernetes.NewForConfigOrDie(kubeConfig)
policyClient := policyClientset.NewForConfigOrDie(kubeConfig)
configClient := configClientset.NewForConfigOrDie(kubeConfig)

// Initialize the generic Kubernetes event recorder and associate it with the osm-controller pod resource
controllerPod, err := getOSMControllerPod(kubeClient)
Expand All @@ -176,17 +181,28 @@ func main() {

msgBroker := messaging.NewBroker(stop)

// This component will be watching resources in the config.openservicemesh.io API group
cfg, err := configurator.NewConfigurator(configClientset.NewForConfigOrDie(kubeConfig), stop, osmNamespace, osmMeshConfigName, msgBroker)
smiTrafficSplitClientSet := smiTrafficSplitClient.NewForConfigOrDie(kubeConfig)
smiTrafficSpecClientSet := smiTrafficSpecClient.NewForConfigOrDie(kubeConfig)
smiTrafficTargetClientSet := smiAccessClient.NewForConfigOrDie(kubeConfig)

informerCollection, err := informers.NewInformerCollection(meshName, stop,
informers.WithKubeClient(kubeClient),
informers.WithSMIClients(smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet),
informers.WithConfigClient(configClient),
informers.WithPolicyClient(policyClient),
)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating informer collection")
}

k8sClient, err := k8s.NewKubernetesController(kubeClient, policyClient, meshName, stop, msgBroker)
// This component will be watching resources in the config.openservicemesh.io API group
cfg, err := configurator.NewConfigurator(configClient, stop, osmNamespace, osmMeshConfigName, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating Kubernetes Controller")
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
}

k8sClient := k8s.NewKubernetesController(informerCollection, policyClient, msgBroker)

meshSpec, err := smi.NewMeshSpecClient(kubeConfig, osmNamespace, k8sClient, stop, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating MeshSpec")
Expand Down Expand Up @@ -215,16 +231,16 @@ func main() {
}
}

var configClient config.Controller
var multiclusterConfigClient config.Controller

if cfg.GetFeatureFlags().EnableMulticlusterMode {
if configClient, err = config.NewConfigController(kubeConfig, k8sClient, stop, msgBroker); err != nil {
if multiclusterConfigClient, err = config.NewConfigController(kubeConfig, k8sClient, stop, msgBroker); err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating Kubernetes config client")
}
}

// A nil configClient is passed in if multi cluster mode is not enabled.
kubeProvider := kube.NewClient(k8sClient, configClient, cfg)
// A nil multiclusterConfigClient is passed in if multi cluster mode is not enabled.
kubeProvider := kube.NewClient(k8sClient, multiclusterConfigClient, cfg)

endpointsProviders := []endpoint.Provider{kubeProvider}
serviceProviders := []service.Provider{kubeProvider}
Expand Down
25 changes: 21 additions & 4 deletions cmd/osm-injector/osm-injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"time"

"github.com/pkg/errors"
smiAccessClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/clientset/versioned"
smiTrafficSpecClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/clientset/versioned"
smiTrafficSplitClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
"github.com/spf13/pflag"
admissionv1 "k8s.io/api/admissionregistration/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -32,6 +35,7 @@ import (
"github.com/openservicemesh/osm/pkg/injector"
"github.com/openservicemesh/osm/pkg/k8s"
"github.com/openservicemesh/osm/pkg/k8s/events"
"github.com/openservicemesh/osm/pkg/k8s/informers"
"github.com/openservicemesh/osm/pkg/logger"
"github.com/openservicemesh/osm/pkg/messaging"
"github.com/openservicemesh/osm/pkg/metricsstore"
Expand Down Expand Up @@ -139,6 +143,7 @@ func main() {
}
kubeClient := kubernetes.NewForConfigOrDie(kubeConfig)
policyClient := policyClientset.NewForConfigOrDie(kubeConfig)
configClient := configClientset.NewForConfigOrDie(kubeConfig)

// Initialize the generic Kubernetes event recorder and associate it with the osm-injector pod resource
injectorPod, err := getInjectorPod(kubeClient)
Expand Down Expand Up @@ -172,17 +177,29 @@ func main() {

msgBroker := messaging.NewBroker(stop)

smiTrafficSplitClientSet := smiTrafficSplitClient.NewForConfigOrDie(kubeConfig)
smiTrafficSpecClientSet := smiTrafficSpecClient.NewForConfigOrDie(kubeConfig)
smiTrafficTargetClientSet := smiAccessClient.NewForConfigOrDie(kubeConfig)

informerCollection, err := informers.NewInformerCollection(meshName, stop,
informers.WithKubeClient(kubeClient),
informers.WithSMIClients(smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet),
informers.WithConfigClient(configClient),
informers.WithPolicyClient(policyClient),
)

if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating informer collection")
}

// Initialize Configurator to watch resources in the config.openservicemesh.io API group
cfg, err := configurator.NewConfigurator(configClientset.NewForConfigOrDie(kubeConfig), stop, osmNamespace, osmMeshConfigName, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
}

// Initialize kubernetes.Controller to watch kubernetes resources
kubeController, err := k8s.NewKubernetesController(kubeClient, policyClient, meshName, stop, msgBroker, k8s.Namespaces)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating Kubernetes Controller")
}
kubeController := k8s.NewKubernetesController(informerCollection, policyClient, msgBroker, k8s.Namespaces)

certOpts, err := getCertOptions()
if err != nil {
Expand Down
108 changes: 21 additions & 87 deletions pkg/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,33 @@ import (
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/pointer"

"github.com/openservicemesh/osm/pkg/announcements"
policyv1alpha1 "github.com/openservicemesh/osm/pkg/apis/policy/v1alpha1"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/errcode"
policyv1alpha1Client "github.com/openservicemesh/osm/pkg/gen/client/policy/clientset/versioned"
"github.com/openservicemesh/osm/pkg/messaging"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/identity"
osminformers "github.com/openservicemesh/osm/pkg/k8s/informers"
"github.com/openservicemesh/osm/pkg/service"
)

// NewKubernetesController returns a new kubernetes.Controller which means to provide access to locally-cached k8s resources
func NewKubernetesController(kubeClient kubernetes.Interface, policyClient policyv1alpha1Client.Interface, meshName string,
stop <-chan struct{}, msgBroker *messaging.Broker, selectInformers ...InformerKey) (Controller, error) {
return newClient(kubeClient, policyClient, meshName, stop, msgBroker, selectInformers...)
func NewKubernetesController(informerCollection *osminformers.InformerCollection, policyClient policyv1alpha1Client.Interface, msgBroker *messaging.Broker, selectInformers ...InformerKey) Controller {
return newClient(informerCollection, policyClient, msgBroker, selectInformers...)
}

func newClient(kubeClient kubernetes.Interface, policyClient policyv1alpha1Client.Interface, meshName string,
stop <-chan struct{}, msgBroker *messaging.Broker, selectInformers ...InformerKey) (*client, error) {
func newClient(informerCollection *osminformers.InformerCollection, policyClient policyv1alpha1Client.Interface, msgBroker *messaging.Broker, selectInformers ...InformerKey) *client {
// Initialize client object
c := &client{
kubeClient: kubeClient,
policyClient: policyClient,
meshName: meshName,
informers: informerCollection{},
informers: informerCollection,
msgBroker: msgBroker,
policyClient: policyClient,
}

// Initialize informers
Expand All @@ -65,35 +58,18 @@ func newClient(kubeClient kubernetes.Interface, policyClient policyv1alpha1Clien
informerInitHandlerMap[informer]()
}

if err := c.run(stop); err != nil {
log.Error().Err(err).Msg("Could not start Kubernetes Namespaces client")
return nil, err
}

return c, nil
return c
}

// Initializes Namespace monitoring
func (c *client) initNamespaceMonitor() {
monitorNamespaceLabel := map[string]string{constants.OSMKubeResourceMonitorAnnotation: c.meshName}

labelSelector := fields.SelectorFromSet(monitorNamespaceLabel).String()
option := informers.WithTweakListOptions(func(opt *metav1.ListOptions) {
opt.LabelSelector = labelSelector
})

informerFactory := informers.NewSharedInformerFactoryWithOptions(c.kubeClient, DefaultKubeEventResyncInterval, option)

// Add informer
c.informers[Namespaces] = informerFactory.Core().V1().Namespaces().Informer()

// Add event handler to informer
nsEventTypes := EventTypes{
Add: announcements.NamespaceAdded,
Update: announcements.NamespaceUpdated,
Delete: announcements.NamespaceDeleted,
}
c.informers[Namespaces].AddEventHandler(GetEventHandlerFuncs(nil, nsEventTypes, c.msgBroker))
c.informers.AddEventHandler(osminformers.InformerKeyNamespace, GetEventHandlerFuncs(nil, nsEventTypes, c.msgBroker))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can drop the prefix InformerKey from the variable name. Same applies to other informer keys.

Suggested change
c.informers.AddEventHandler(osminformers.InformerKeyNamespace, GetEventHandlerFuncs(nil, nsEventTypes, c.msgBroker))
c.informers.AddEventHandler(osminformers.NamespaceKey, GetEventHandlerFuncs(nil, nsEventTypes, c.msgBroker))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This prefix is actually a naming convention used in client-go and other packages when creating variables for a type alias. It helps remove ambiguity when searching for ambiguous or overloaded vars (like namespace or pod)

}

// Function to filter K8s meta Objects by OSM's isMonitoredNamespace
Expand All @@ -107,94 +83,52 @@ func (c *client) shouldObserve(obj interface{}) bool {

// Initializes Service monitoring
func (c *client) initServicesMonitor() {
informerFactory := informers.NewSharedInformerFactory(c.kubeClient, DefaultKubeEventResyncInterval)
c.informers[Services] = informerFactory.Core().V1().Services().Informer()

svcEventTypes := EventTypes{
Add: announcements.ServiceAdded,
Update: announcements.ServiceUpdated,
Delete: announcements.ServiceDeleted,
}
c.informers[Services].AddEventHandler(GetEventHandlerFuncs(c.shouldObserve, svcEventTypes, c.msgBroker))
c.informers.AddEventHandler(osminformers.InformerKeyService, GetEventHandlerFuncs(c.shouldObserve, svcEventTypes, c.msgBroker))
}

// Initializes Service Account monitoring
func (c *client) initServiceAccountsMonitor() {
informerFactory := informers.NewSharedInformerFactory(c.kubeClient, DefaultKubeEventResyncInterval)
c.informers[ServiceAccounts] = informerFactory.Core().V1().ServiceAccounts().Informer()

svcEventTypes := EventTypes{
Add: announcements.ServiceAccountAdded,
Update: announcements.ServiceAccountUpdated,
Delete: announcements.ServiceAccountDeleted,
}
c.informers[ServiceAccounts].AddEventHandler(GetEventHandlerFuncs(c.shouldObserve, svcEventTypes, c.msgBroker))
c.informers.AddEventHandler(osminformers.InformerKeyServiceAccount, GetEventHandlerFuncs(c.shouldObserve, svcEventTypes, c.msgBroker))
}

func (c *client) initPodMonitor() {
informerFactory := informers.NewSharedInformerFactory(c.kubeClient, DefaultKubeEventResyncInterval)
c.informers[Pods] = informerFactory.Core().V1().Pods().Informer()

podEventTypes := EventTypes{
Add: announcements.PodAdded,
Update: announcements.PodUpdated,
Delete: announcements.PodDeleted,
}
c.informers[Pods].AddEventHandler(GetEventHandlerFuncs(c.shouldObserve, podEventTypes, c.msgBroker))
c.informers.AddEventHandler(osminformers.InformerKeyPod, GetEventHandlerFuncs(c.shouldObserve, podEventTypes, c.msgBroker))
}

func (c *client) initEndpointMonitor() {
informerFactory := informers.NewSharedInformerFactory(c.kubeClient, DefaultKubeEventResyncInterval)
c.informers[Endpoints] = informerFactory.Core().V1().Endpoints().Informer()

eptEventTypes := EventTypes{
Add: announcements.EndpointAdded,
Update: announcements.EndpointUpdated,
Delete: announcements.EndpointDeleted,
}
c.informers[Endpoints].AddEventHandler(GetEventHandlerFuncs(c.shouldObserve, eptEventTypes, c.msgBroker))
}

func (c *client) run(stop <-chan struct{}) error {
log.Info().Msg("Namespace controller client started")
var hasSynced []cache.InformerSynced
var names []string

if c.informers == nil {
return errInitInformers
}

for name, informer := range c.informers {
if informer == nil {
continue
}

go informer.Run(stop)
names = append(names, (string)(name))
log.Info().Msgf("Waiting for %s informer cache sync...", name)
hasSynced = append(hasSynced, informer.HasSynced)
}

if !cache.WaitForCacheSync(stop, hasSynced...) {
return errSyncingCaches
}

log.Info().Msgf("Caches for %v synced successfully", names)

return nil
c.informers.AddEventHandler(osminformers.InformerKeyEndpoints, GetEventHandlerFuncs(c.shouldObserve, eptEventTypes, c.msgBroker))
}

// IsMonitoredNamespace returns a boolean indicating if the namespace is among the list of monitored namespaces
func (c client) IsMonitoredNamespace(namespace string) bool {
_, exists, _ := c.informers[Namespaces].GetStore().GetByKey(namespace)
return exists
return c.informers.IsMonitoredNamespace(namespace)
}

// ListMonitoredNamespaces returns all namespaces that the mesh is monitoring.
func (c client) ListMonitoredNamespaces() ([]string, error) {
var namespaces []string

for _, ns := range c.informers[Namespaces].GetStore().List() {
for _, ns := range c.informers.List(osminformers.InformerKeyNamespace) {
namespace, ok := ns.(*corev1.Namespace)
if !ok {
log.Error().Err(errListingNamespaces).Msg("Failed to list monitored namespaces")
Expand All @@ -208,7 +142,7 @@ func (c client) ListMonitoredNamespaces() ([]string, error) {
// GetService retrieves the Kubernetes Services resource for the given MeshService
func (c client) GetService(svc service.MeshService) *corev1.Service {
// client-go cache uses <namespace>/<name> as key
svcIf, exists, err := c.informers[Services].GetStore().GetByKey(svc.NamespacedKey())
svcIf, exists, err := c.informers.GetByKey(osminformers.InformerKeyService, svc.NamespacedKey())
if exists && err == nil {
svc := svcIf.(*corev1.Service)
return svc
Expand All @@ -220,7 +154,7 @@ func (c client) GetService(svc service.MeshService) *corev1.Service {
func (c client) ListServices() []*corev1.Service {
var services []*corev1.Service

for _, serviceInterface := range c.informers[Services].GetStore().List() {
for _, serviceInterface := range c.informers.List(osminformers.InformerKeyService) {
svc := serviceInterface.(*corev1.Service)

if !c.IsMonitoredNamespace(svc.Namespace) {
Expand All @@ -235,7 +169,7 @@ func (c client) ListServices() []*corev1.Service {
func (c client) ListServiceAccounts() []*corev1.ServiceAccount {
var serviceAccounts []*corev1.ServiceAccount

for _, serviceInterface := range c.informers[ServiceAccounts].GetStore().List() {
for _, serviceInterface := range c.informers.List(osminformers.InformerKeyServiceAccount) {
sa := serviceInterface.(*corev1.ServiceAccount)

if !c.IsMonitoredNamespace(sa.Namespace) {
Expand All @@ -248,7 +182,7 @@ func (c client) ListServiceAccounts() []*corev1.ServiceAccount {

// GetNamespace returns a Namespace resource if found, nil otherwise.
func (c client) GetNamespace(ns string) *corev1.Namespace {
nsIf, exists, err := c.informers[Namespaces].GetStore().GetByKey(ns)
nsIf, exists, err := c.informers.GetByKey(osminformers.InformerKeyNamespace, ns)
if exists && err == nil {
ns := nsIf.(*corev1.Namespace)
return ns
Expand All @@ -262,7 +196,7 @@ func (c client) GetNamespace(ns string) *corev1.Namespace {
func (c client) ListPods() []*corev1.Pod {
var pods []*corev1.Pod

for _, podInterface := range c.informers[Pods].GetStore().List() {
for _, podInterface := range c.informers.List(osminformers.InformerKeyPod) {
pod := podInterface.(*corev1.Pod)
if !c.IsMonitoredNamespace(pod.Namespace) {
continue
Expand All @@ -275,7 +209,7 @@ func (c client) ListPods() []*corev1.Pod {
// GetEndpoints returns the endpoint for a given service, otherwise returns nil if not found
// or error if the API errored out.
func (c client) GetEndpoints(svc service.MeshService) (*corev1.Endpoints, error) {
ep, exists, err := c.informers[Endpoints].GetStore().GetByKey(svc.NamespacedKey())
ep, exists, err := c.informers.GetByKey(osminformers.InformerKeyEndpoints, svc.NamespacedKey())
if err != nil {
return nil, err
}
Expand Down
Loading