Skip to content
This repository was archived by the owner on Jul 11, 2023. It is now read-only.

(k8s/informers): Use InformerCollection for other clients #4804

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion cmd/osm-bootstrap/osm-bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/openservicemesh/osm/pkg/crdconversion"
"github.com/openservicemesh/osm/pkg/httpserver"
"github.com/openservicemesh/osm/pkg/k8s/events"
"github.com/openservicemesh/osm/pkg/k8s/informers"
"github.com/openservicemesh/osm/pkg/logger"
"github.com/openservicemesh/osm/pkg/messaging"
"github.com/openservicemesh/osm/pkg/metricsstore"
Expand Down Expand Up @@ -196,8 +197,17 @@ func main() {

msgBroker := messaging.NewBroker(stop)

informerCollection, err := informers.NewInformerCollection(meshName, stop,
informers.WithKubeClient(kubeClient),
informers.WithConfigClient(configClient),
)

if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating informer collection")
}

// Initialize Configurator to watch resources in the config.openservicemesh.io API group
cfg, err := configurator.NewConfigurator(configClient, stop, osmNamespace, osmMeshConfigName, msgBroker)
cfg, err := configurator.NewConfigurator(informerCollection, osmNamespace, osmMeshConfigName, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/osm-controller/osm-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func main() {
}

// This component will be watching resources in the config.openservicemesh.io API group
cfg, err := configurator.NewConfigurator(configClient, stop, osmNamespace, osmMeshConfigName, msgBroker)
cfg, err := configurator.NewConfigurator(informerCollection, osmNamespace, osmMeshConfigName, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/osm-injector/osm-injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func main() {
}

// Initialize Configurator to watch resources in the config.openservicemesh.io API group
cfg, err := configurator.NewConfigurator(configClientset.NewForConfigOrDie(kubeConfig), stop, osmNamespace, osmMeshConfigName, msgBroker)
cfg, err := configurator.NewConfigurator(informerCollection, osmNamespace, osmMeshConfigName, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/catalog/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"k8s.io/client-go/kubernetes"

configClientset "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned"
"github.com/openservicemesh/osm/pkg/k8s/informers"

"github.com/openservicemesh/osm/pkg/catalog"
tresorFake "github.com/openservicemesh/osm/pkg/certificate/providers/tresor/fake"
Expand Down Expand Up @@ -46,7 +47,12 @@ func NewFakeMeshCatalog(kubeClient kubernetes.Interface, meshConfigClient config

osmNamespace := "-test-osm-namespace-"
osmMeshConfigName := "-test-osm-mesh-config-"
cfg, err := configurator.NewConfigurator(meshConfigClient, stop, osmNamespace, osmMeshConfigName, nil)
ic, err := informers.NewInformerCollection("osm", stop, informers.WithKubeClient(kubeClient), informers.WithConfigClient(meshConfigClient))
if err != nil {
return nil
}

cfg, err := configurator.NewConfigurator(ic, osmNamespace, osmMeshConfigName, nil)
if err != nil {
return nil
}
Expand Down
92 changes: 10 additions & 82 deletions pkg/configurator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@ import (
"fmt"
"reflect"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"

configv1alpha2 "github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
configClientset "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned"
configInformers "github.com/openservicemesh/osm/pkg/gen/client/config/informers/externalversions"
"github.com/openservicemesh/osm/pkg/k8s/informers"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/errcode"
Expand All @@ -21,44 +17,16 @@ import (
)

// NewConfigurator implements configurator.Configurator and creates the Kubernetes client to manage namespaces.
func NewConfigurator(configClient configClientset.Interface, stop <-chan struct{}, osmNamespace, meshConfigName string,
func NewConfigurator(informerCollection *informers.InformerCollection, osmNamespace, meshConfigName string,
msgBroker *messaging.Broker) (Configurator, error) {
return newConfigurator(configClient, stop, osmNamespace, meshConfigName, msgBroker)
return newConfigurator(informerCollection, osmNamespace, meshConfigName, msgBroker)
}

func newConfigurator(configClient configClientset.Interface, stop <-chan struct{}, osmNamespace string, meshConfigName string,
func newConfigurator(informerCollection *informers.InformerCollection, osmNamespace string, meshConfigName string,
msgBroker *messaging.Broker) (*client, error) {
listOption := configInformers.WithTweakListOptions(func(opt *metav1.ListOptions) {
opt.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, meshConfigName).String()
})

meshConfigInformerFactory := configInformers.NewSharedInformerFactoryWithOptions(
configClient,
k8s.DefaultKubeEventResyncInterval,
configInformers.WithNamespace(osmNamespace),
listOption,
)

// informerFactory without listOptions
configInformerFactory := configInformers.NewSharedInformerFactoryWithOptions(
configClient,
k8s.DefaultKubeEventResyncInterval,
configInformers.WithNamespace(osmNamespace),
)

informerCollection := informerCollection{
meshConfig: meshConfigInformerFactory.Config().V1alpha2().MeshConfigs().Informer(),
meshRootCertificate: configInformerFactory.Config().V1alpha2().MeshRootCertificates().Informer(),
}

cacheCollection := cacheCollection{
meshConfig: informerCollection.meshConfig.GetStore(),
meshRootCertificate: informerCollection.meshRootCertificate.GetStore(),
}

c := &client{
informers: &informerCollection,
caches: &cacheCollection,
informers: informerCollection,
osmNamespace: osmNamespace,
meshConfigName: meshConfigName,
}
Expand All @@ -69,60 +37,20 @@ func newConfigurator(configClient configClientset.Interface, stop <-chan struct{
Update: announcements.MeshConfigUpdated,
Delete: announcements.MeshConfigDeleted,
}
informerCollection.meshConfig.AddEventHandler(k8s.GetEventHandlerFuncs(nil, meshConfigEventTypes, msgBroker))
informerCollection.meshConfig.AddEventHandler(c.metricsHandler())

informerCollection.AddEventHandler(informers.InformerKeyMeshConfig, k8s.GetEventHandlerFuncs(nil, meshConfigEventTypes, msgBroker))
informerCollection.AddEventHandler(informers.InformerKeyMeshConfig, c.metricsHandler())

meshRootCertificateEventTypes := k8s.EventTypes{
Add: announcements.MeshRootCertificateAdded,
Update: announcements.MeshRootCertificateUpdated,
Delete: announcements.MeshRootCertificateDeleted,
}
informerCollection.meshRootCertificate.AddEventHandler(k8s.GetEventHandlerFuncs(nil, meshRootCertificateEventTypes, msgBroker))

err := c.run(stop)
if err != nil {
return c, errors.Errorf("Could not start %s informer clients: %s", configv1alpha2.SchemeGroupVersion, err)
}
informerCollection.AddEventHandler(informers.InformerKeyMeshRootCertificate, k8s.GetEventHandlerFuncs(nil, meshRootCertificateEventTypes, msgBroker))

return c, nil
}

func (c *client) run(stop <-chan struct{}) error {
log.Info().Msgf("Starting informer clients for API group %s", configv1alpha2.SchemeGroupVersion)

if c.informers == nil {
return errors.New("config.openservicemesh.io informers not initialized")
}

sharedInformers := map[string]cache.SharedInformer{
"MeshConfig": c.informers.meshConfig,
"MeshRootCertificate": c.informers.meshRootCertificate,
}

var informerNames []string
var hasSynced []cache.InformerSynced
for name, informer := range sharedInformers {
if informer == nil {
log.Error().Msgf("Informer for '%s' not initialized, ignoring it", name) // TODO: log with errcode
continue
}
informerNames = append(informerNames, name)
log.Info().Msgf("Starting informer: %s", name)
go informer.Run(stop)
hasSynced = append(hasSynced, informer.HasSynced)
}

log.Info().Msgf("Waiting for informers %v caches to sync", informerNames)
if !cache.WaitForCacheSync(stop, hasSynced...) {
// TODO(#3962): metric might not be scraped before process restart resulting from this error
log.Error().Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrConfigInformerInitCache)).Msg("Failed initial cache sync for config.openservicemesh.io informers")
return errors.New("Failed initial cache sync for config.openservicemesh.io informers")
}

log.Info().Msgf("Cache sync finished for %v informers in API group %s", informerNames, configv1alpha2.SchemeGroupVersion)
return nil
}

func (c *client) getMeshConfigCacheKey() string {
return fmt.Sprintf("%s/%s", c.osmNamespace, c.meshConfigName)
}
Expand All @@ -132,7 +60,7 @@ func (c *client) getMeshConfig() configv1alpha2.MeshConfig {
var meshConfig configv1alpha2.MeshConfig

meshConfigCacheKey := c.getMeshConfigCacheKey()
item, exists, err := c.caches.meshConfig.GetByKey(meshConfigCacheKey)
item, exists, err := c.informers.GetByKey(informers.InformerKeyMeshConfig, meshConfigCacheKey)
if err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrMeshConfigFetchFromCache)).Msgf("Error getting MeshConfig from cache with key %s", meshConfigCacheKey)
return meshConfig
Expand Down
20 changes: 8 additions & 12 deletions pkg/configurator/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"

fakeConfig "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned/fake"
"github.com/openservicemesh/osm/pkg/k8s/informers"
"github.com/openservicemesh/osm/pkg/metricsstore"

configv1alpha2 "github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
Expand All @@ -23,7 +23,11 @@ func TestGetMeshConfig(t *testing.T) {

meshConfigClient := fakeConfig.NewSimpleClientset()
stop := make(chan struct{})
c, err := newConfigurator(meshConfigClient, stop, osmNamespace, osmMeshConfigName, nil)

ic, err := informers.NewInformerCollection("osm", stop, informers.WithConfigClient(meshConfigClient))
a.Nil(err)

c, err := newConfigurator(ic, osmNamespace, osmMeshConfigName, nil)
a.Nil(err)

// Returns empty MeshConfig if informer cache is empty
Expand All @@ -39,25 +43,17 @@ func TestGetMeshConfig(t *testing.T) {
Name: osmMeshConfigName,
},
}
err = c.caches.meshConfig.Add(newObj)
err = c.informers.Add(informers.InformerKeyMeshConfig, newObj, t)
a.Nil(err)
a.Equal(*newObj, c.getMeshConfig())
}

type store struct {
cache.Store
}

func (s *store) GetByKey(_ string) (interface{}, bool, error) {
return nil, false, nil
}

func TestMetricsHandler(t *testing.T) {
a := assert.New(t)

c := &client{
caches: &cacheCollection{meshConfig: &store{}},
meshConfigName: osmMeshConfigName,
informers: &informers.InformerCollection{},
}
handlers := c.metricsHandler()
metricsstore.DefaultMetricsStore.Start(metricsstore.DefaultMetricsStore.FeatureFlagEnabled)
Expand Down
19 changes: 14 additions & 5 deletions pkg/configurator/methods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

configv1alpha2 "github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
testclient "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned/fake"
"github.com/openservicemesh/osm/pkg/k8s/informers"

"github.com/openservicemesh/osm/pkg/constants"
)
Expand All @@ -28,9 +29,12 @@ func TestGetMeshConfigCacheKey(t *testing.T) {
func TestCreateUpdateConfig(t *testing.T) {
t.Run("MeshConfig doesn't exist", func(t *testing.T) {
meshConfigClientSet := testclient.NewSimpleClientset()

stop := make(chan struct{})
cfg, err := newConfigurator(meshConfigClientSet, stop, osmNamespace, osmMeshConfigName, nil)

ic, err := informers.NewInformerCollection("osm", stop, informers.WithConfigClient(meshConfigClientSet))
tassert.Nil(t, err)

cfg, err := newConfigurator(ic, osmNamespace, osmMeshConfigName, nil)
tassert.Nil(t, err)
tassert.Equal(t, configv1alpha2.MeshConfig{}, cfg.getMeshConfig())
})
Expand Down Expand Up @@ -476,7 +480,11 @@ func TestCreateUpdateConfig(t *testing.T) {
// Create configurator
stop := make(chan struct{})
defer close(stop)
cfg, err := newConfigurator(meshConfigClientSet, stop, osmNamespace, osmMeshConfigName, nil)

ic, err := informers.NewInformerCollection("osm", stop, informers.WithConfigClient(meshConfigClientSet))
assert.Nil(err)

cfg, err := newConfigurator(ic, osmNamespace, osmMeshConfigName, nil)
assert.Nil(err)

meshConfig := configv1alpha2.MeshConfig{
Expand All @@ -487,7 +495,7 @@ func TestCreateUpdateConfig(t *testing.T) {
Spec: *test.initialMeshConfigData,
}

err = cfg.caches.meshConfig.Add(&meshConfig)
err = cfg.informers.Add(informers.InformerKeyMeshConfig, &meshConfig, t)
assert.Nil(err)

test.checkCreate(assert, cfg)
Expand All @@ -497,7 +505,8 @@ func TestCreateUpdateConfig(t *testing.T) {
}

meshConfig.Spec = *test.updatedMeshConfigData
err = cfg.caches.meshConfig.Update(&meshConfig)
err = cfg.informers.Update(informers.InformerKeyMeshConfig, &meshConfig, t)

assert.Nil(err)

test.checkUpdate(assert, cfg)
Expand Down
15 changes: 2 additions & 13 deletions pkg/configurator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"

configv1alpha2 "github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
"github.com/openservicemesh/osm/pkg/k8s/informers"

"github.com/openservicemesh/osm/pkg/auth"
"github.com/openservicemesh/osm/pkg/logger"
Expand All @@ -17,21 +17,10 @@ var (
log = logger.New("configurator")
)

type informerCollection struct {
meshConfig cache.SharedIndexInformer
meshRootCertificate cache.SharedIndexInformer
}

type cacheCollection struct {
meshConfig cache.Store
meshRootCertificate cache.Store
}

// client is the type used to represent the Kubernetes client for the config.openservicemesh.io API group
type client struct {
osmNamespace string
informers *informerCollection
caches *cacheCollection
informers *informers.InformerCollection
meshConfigName string
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/envoy/sds/response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/openservicemesh/osm/pkg/envoy/secrets"
configFake "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned/fake"
"github.com/openservicemesh/osm/pkg/k8s/informers"

"github.com/openservicemesh/osm/pkg/catalog"
catalogFake "github.com/openservicemesh/osm/pkg/catalog/fake"
Expand Down Expand Up @@ -56,8 +57,10 @@ func TestNewResponse(t *testing.T) {
podID := uuid.New()

proxy := envoy.NewProxy(envoy.KindSidecar, podID, proxySvcAccount.ToServiceIdentity(), nil)
ic, err := informers.NewInformerCollection("osm", stop, informers.WithKubeClient(fakeKubeClient), informers.WithConfigClient(fakeConfigClient))
assert.Nil(err)

cfg, err := configurator.NewConfigurator(fakeConfigClient, stop, "-osm-namespace-", "-the-mesh-config-name-", nil)
cfg, err := configurator.NewConfigurator(ic, "-osm-namespace-", "-the-mesh-config-name-", nil)
assert.Nil(err)
certManager := tresorFake.NewFake(nil)
meshCatalog := catalogFake.NewFakeMeshCatalog(fakeKubeClient, fakeConfigClient)
Expand Down
Loading