Skip to content
This repository was archived by the owner on Jul 11, 2023. It is now read-only.

(k8s/informers): Use InformerCollection for other clients #4804

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions cmd/osm-bootstrap/osm-bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/openservicemesh/osm/pkg/crdconversion"
"github.com/openservicemesh/osm/pkg/httpserver"
"github.com/openservicemesh/osm/pkg/k8s/events"
"github.com/openservicemesh/osm/pkg/k8s/informers"
"github.com/openservicemesh/osm/pkg/logger"
"github.com/openservicemesh/osm/pkg/messaging"
"github.com/openservicemesh/osm/pkg/metricsstore"
Expand Down Expand Up @@ -196,12 +197,18 @@ func main() {

msgBroker := messaging.NewBroker(stop)

// Initialize Configurator to watch resources in the config.openservicemesh.io API group
cfg, err := configurator.NewConfigurator(configClient, stop, osmNamespace, osmMeshConfigName, msgBroker)
informerCollection, err := informers.NewInformerCollection(meshName, stop,
informers.WithKubeClient(kubeClient),
informers.WithConfigClient(configClient),
)

if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating informer collection")
}

// Initialize Configurator to watch resources in the config.openservicemesh.io API group
cfg := configurator.NewConfigurator(informerCollection, osmNamespace, osmMeshConfigName, msgBroker)

certOpts, err := getCertOptions()
if err != nil {
log.Fatal().Err(err).Msg("Error getting certificate options")
Expand Down
16 changes: 4 additions & 12 deletions cmd/osm-controller/osm-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,17 +196,12 @@ func main() {
}

// This component will be watching resources in the config.openservicemesh.io API group
cfg, err := configurator.NewConfigurator(configClient, stop, osmNamespace, osmMeshConfigName, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
}
cfg := configurator.NewConfigurator(informerCollection, osmNamespace, osmMeshConfigName, msgBroker)

k8sClient := k8s.NewKubernetesController(informerCollection, policyClient, msgBroker)

meshSpec, err := smi.NewMeshSpecClient(kubeConfig, osmNamespace, k8sClient, stop, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating MeshSpec")
}
meshSpec := smi.NewSMIClient(informerCollection, osmNamespace, k8sClient, msgBroker)

certOpts, err := getCertOptions()
if err != nil {
log.Fatal().Err(err).Msg("Error getting certificate options")
Expand Down Expand Up @@ -249,10 +244,7 @@ func main() {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating Ingress client")
}

policyController, err := policy.NewPolicyController(k8sClient, policyClient, stop, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for policy.openservicemesh.io")
}
policyController := policy.NewPolicyController(informerCollection, k8sClient, msgBroker)

meshCatalog := catalog.NewMeshCatalog(
k8sClient,
Expand Down
5 changes: 1 addition & 4 deletions cmd/osm-injector/osm-injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,7 @@ func main() {
}

// Initialize Configurator to watch resources in the config.openservicemesh.io API group
cfg, err := configurator.NewConfigurator(configClientset.NewForConfigOrDie(kubeConfig), stop, osmNamespace, osmMeshConfigName, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
}
cfg := configurator.NewConfigurator(informerCollection, osmNamespace, osmMeshConfigName, msgBroker)

// Initialize kubernetes.Controller to watch kubernetes resources
kubeController := k8s.NewKubernetesController(informerCollection, policyClient, msgBroker, k8s.Namespaces)
Expand Down
6 changes: 5 additions & 1 deletion pkg/catalog/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"k8s.io/client-go/kubernetes"

configClientset "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned"
"github.com/openservicemesh/osm/pkg/k8s/informers"

"github.com/openservicemesh/osm/pkg/catalog"
tresorFake "github.com/openservicemesh/osm/pkg/certificate/providers/tresor/fake"
Expand Down Expand Up @@ -46,10 +47,13 @@ func NewFakeMeshCatalog(kubeClient kubernetes.Interface, meshConfigClient config

osmNamespace := "-test-osm-namespace-"
osmMeshConfigName := "-test-osm-mesh-config-"
cfg, err := configurator.NewConfigurator(meshConfigClient, stop, osmNamespace, osmMeshConfigName, nil)
ic, err := informers.NewInformerCollection("osm", stop, informers.WithKubeClient(kubeClient), informers.WithConfigClient(meshConfigClient))
if err != nil {
return nil
}

cfg := configurator.NewConfigurator(ic, osmNamespace, osmMeshConfigName, nil)

certManager := tresorFake.NewFake(nil)

// #1683 tracks potential improvements to the following dynamic mocks
Expand Down
105 changes: 13 additions & 92 deletions pkg/configurator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@ import (
"fmt"
"reflect"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"

configv1alpha2 "github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
configClientset "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned"
configInformers "github.com/openservicemesh/osm/pkg/gen/client/config/informers/externalversions"
"github.com/openservicemesh/osm/pkg/k8s/informers"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/errcode"
Expand All @@ -21,44 +17,9 @@ import (
)

// NewConfigurator implements configurator.Configurator and creates the Kubernetes client to manage namespaces.
func NewConfigurator(configClient configClientset.Interface, stop <-chan struct{}, osmNamespace, meshConfigName string,
msgBroker *messaging.Broker) (Configurator, error) {
return newConfigurator(configClient, stop, osmNamespace, meshConfigName, msgBroker)
}

func newConfigurator(configClient configClientset.Interface, stop <-chan struct{}, osmNamespace string, meshConfigName string,
msgBroker *messaging.Broker) (*client, error) {
listOption := configInformers.WithTweakListOptions(func(opt *metav1.ListOptions) {
opt.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, meshConfigName).String()
})

meshConfigInformerFactory := configInformers.NewSharedInformerFactoryWithOptions(
configClient,
k8s.DefaultKubeEventResyncInterval,
configInformers.WithNamespace(osmNamespace),
listOption,
)

// informerFactory without listOptions
configInformerFactory := configInformers.NewSharedInformerFactoryWithOptions(
configClient,
k8s.DefaultKubeEventResyncInterval,
configInformers.WithNamespace(osmNamespace),
)

informerCollection := informerCollection{
meshConfig: meshConfigInformerFactory.Config().V1alpha2().MeshConfigs().Informer(),
meshRootCertificate: configInformerFactory.Config().V1alpha2().MeshRootCertificates().Informer(),
}

cacheCollection := cacheCollection{
meshConfig: informerCollection.meshConfig.GetStore(),
meshRootCertificate: informerCollection.meshRootCertificate.GetStore(),
}

c := &client{
informers: &informerCollection,
caches: &cacheCollection,
func NewConfigurator(informerCollection *informers.InformerCollection, osmNamespace, meshConfigName string, msgBroker *messaging.Broker) *Client {
c := &Client{
informers: informerCollection,
osmNamespace: osmNamespace,
meshConfigName: meshConfigName,
}
Expand All @@ -69,70 +30,30 @@ func newConfigurator(configClient configClientset.Interface, stop <-chan struct{
Update: announcements.MeshConfigUpdated,
Delete: announcements.MeshConfigDeleted,
}
informerCollection.meshConfig.AddEventHandler(k8s.GetEventHandlerFuncs(nil, meshConfigEventTypes, msgBroker))
informerCollection.meshConfig.AddEventHandler(c.metricsHandler())

informerCollection.AddEventHandler(informers.InformerKeyMeshConfig, k8s.GetEventHandlerFuncs(nil, meshConfigEventTypes, msgBroker))
informerCollection.AddEventHandler(informers.InformerKeyMeshConfig, c.metricsHandler())

meshRootCertificateEventTypes := k8s.EventTypes{
Add: announcements.MeshRootCertificateAdded,
Update: announcements.MeshRootCertificateUpdated,
Delete: announcements.MeshRootCertificateDeleted,
}
informerCollection.meshRootCertificate.AddEventHandler(k8s.GetEventHandlerFuncs(nil, meshRootCertificateEventTypes, msgBroker))

err := c.run(stop)
if err != nil {
return c, errors.Errorf("Could not start %s informer clients: %s", configv1alpha2.SchemeGroupVersion, err)
}

return c, nil
}

func (c *client) run(stop <-chan struct{}) error {
log.Info().Msgf("Starting informer clients for API group %s", configv1alpha2.SchemeGroupVersion)

if c.informers == nil {
return errors.New("config.openservicemesh.io informers not initialized")
}

sharedInformers := map[string]cache.SharedInformer{
"MeshConfig": c.informers.meshConfig,
"MeshRootCertificate": c.informers.meshRootCertificate,
}

var informerNames []string
var hasSynced []cache.InformerSynced
for name, informer := range sharedInformers {
if informer == nil {
log.Error().Msgf("Informer for '%s' not initialized, ignoring it", name) // TODO: log with errcode
continue
}
informerNames = append(informerNames, name)
log.Info().Msgf("Starting informer: %s", name)
go informer.Run(stop)
hasSynced = append(hasSynced, informer.HasSynced)
}

log.Info().Msgf("Waiting for informers %v caches to sync", informerNames)
if !cache.WaitForCacheSync(stop, hasSynced...) {
// TODO(#3962): metric might not be scraped before process restart resulting from this error
log.Error().Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrConfigInformerInitCache)).Msg("Failed initial cache sync for config.openservicemesh.io informers")
return errors.New("Failed initial cache sync for config.openservicemesh.io informers")
}
informerCollection.AddEventHandler(informers.InformerKeyMeshRootCertificate, k8s.GetEventHandlerFuncs(nil, meshRootCertificateEventTypes, msgBroker))

log.Info().Msgf("Cache sync finished for %v informers in API group %s", informerNames, configv1alpha2.SchemeGroupVersion)
return nil
return c
}

func (c *client) getMeshConfigCacheKey() string {
func (c *Client) getMeshConfigCacheKey() string {
return fmt.Sprintf("%s/%s", c.osmNamespace, c.meshConfigName)
}

// Returns the current MeshConfig
func (c *client) getMeshConfig() configv1alpha2.MeshConfig {
func (c *Client) getMeshConfig() configv1alpha2.MeshConfig {
var meshConfig configv1alpha2.MeshConfig

meshConfigCacheKey := c.getMeshConfigCacheKey()
item, exists, err := c.caches.meshConfig.GetByKey(meshConfigCacheKey)
item, exists, err := c.informers.GetByKey(informers.InformerKeyMeshConfig, meshConfigCacheKey)
if err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrMeshConfigFetchFromCache)).Msgf("Error getting MeshConfig from cache with key %s", meshConfigCacheKey)
return meshConfig
Expand All @@ -147,7 +68,7 @@ func (c *client) getMeshConfig() configv1alpha2.MeshConfig {
return meshConfig
}

func (c *client) metricsHandler() cache.ResourceEventHandlerFuncs {
func (c *Client) metricsHandler() cache.ResourceEventHandlerFuncs {
handleMetrics := func(obj interface{}) {
config := obj.(*configv1alpha2.MeshConfig)

Expand Down
21 changes: 8 additions & 13 deletions pkg/configurator/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"

fakeConfig "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned/fake"
"github.com/openservicemesh/osm/pkg/k8s/informers"
"github.com/openservicemesh/osm/pkg/metricsstore"

configv1alpha2 "github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
Expand All @@ -23,9 +23,12 @@ func TestGetMeshConfig(t *testing.T) {

meshConfigClient := fakeConfig.NewSimpleClientset()
stop := make(chan struct{})
c, err := newConfigurator(meshConfigClient, stop, osmNamespace, osmMeshConfigName, nil)

ic, err := informers.NewInformerCollection("osm", stop, informers.WithConfigClient(meshConfigClient))
a.Nil(err)

c := NewConfigurator(ic, osmNamespace, osmMeshConfigName, nil)

// Returns empty MeshConfig if informer cache is empty
a.Equal(configv1alpha2.MeshConfig{}, c.getMeshConfig())

Expand All @@ -39,25 +42,17 @@ func TestGetMeshConfig(t *testing.T) {
Name: osmMeshConfigName,
},
}
err = c.caches.meshConfig.Add(newObj)
err = c.informers.Add(informers.InformerKeyMeshConfig, newObj, t)
a.Nil(err)
a.Equal(*newObj, c.getMeshConfig())
}

type store struct {
cache.Store
}

func (s *store) GetByKey(_ string) (interface{}, bool, error) {
return nil, false, nil
}

func TestMetricsHandler(t *testing.T) {
a := assert.New(t)

c := &client{
caches: &cacheCollection{meshConfig: &store{}},
c := &Client{
meshConfigName: osmMeshConfigName,
informers: &informers.InformerCollection{},
}
handlers := c.metricsHandler()
metricsstore.DefaultMetricsStore.Start(metricsstore.DefaultMetricsStore.FeatureFlagEnabled)
Expand Down
Loading