Skip to content
This repository was archived by the owner on Jul 11, 2023. It is now read-only.

Commit 241e8ae

Browse files
authored
(k8s/informers): use InformerCollection for other clients (#4804)
Use the new InformerCollection instead of fragmenting informers across different clients. Signed-off-by: Keith Mattix II <[email protected]>
1 parent 7046cf2 commit 241e8ae

24 files changed

+347
-1053
lines changed

cmd/osm-bootstrap/osm-bootstrap.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/openservicemesh/osm/pkg/crdconversion"
3636
"github.com/openservicemesh/osm/pkg/httpserver"
3737
"github.com/openservicemesh/osm/pkg/k8s/events"
38+
"github.com/openservicemesh/osm/pkg/k8s/informers"
3839
"github.com/openservicemesh/osm/pkg/logger"
3940
"github.com/openservicemesh/osm/pkg/messaging"
4041
"github.com/openservicemesh/osm/pkg/metricsstore"
@@ -196,12 +197,18 @@ func main() {
196197

197198
msgBroker := messaging.NewBroker(stop)
198199

199-
// Initialize Configurator to watch resources in the config.openservicemesh.io API group
200-
cfg, err := configurator.NewConfigurator(configClient, stop, osmNamespace, osmMeshConfigName, msgBroker)
200+
informerCollection, err := informers.NewInformerCollection(meshName, stop,
201+
informers.WithKubeClient(kubeClient),
202+
informers.WithConfigClient(configClient),
203+
)
204+
201205
if err != nil {
202-
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
206+
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating informer collection")
203207
}
204208

209+
// Initialize Configurator to watch resources in the config.openservicemesh.io API group
210+
cfg := configurator.NewConfigurator(informerCollection, osmNamespace, osmMeshConfigName, msgBroker)
211+
205212
certOpts, err := getCertOptions()
206213
if err != nil {
207214
log.Fatal().Err(err).Msg("Error getting certificate options")

cmd/osm-controller/osm-controller.go

+4-12
Original file line numberDiff line numberDiff line change
@@ -196,17 +196,12 @@ func main() {
196196
}
197197

198198
// This component will be watching resources in the config.openservicemesh.io API group
199-
cfg, err := configurator.NewConfigurator(configClient, stop, osmNamespace, osmMeshConfigName, msgBroker)
200-
if err != nil {
201-
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
202-
}
199+
cfg := configurator.NewConfigurator(informerCollection, osmNamespace, osmMeshConfigName, msgBroker)
203200

204201
k8sClient := k8s.NewKubernetesController(informerCollection, policyClient, msgBroker)
205202

206-
meshSpec, err := smi.NewMeshSpecClient(kubeConfig, osmNamespace, k8sClient, stop, msgBroker)
207-
if err != nil {
208-
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating MeshSpec")
209-
}
203+
meshSpec := smi.NewSMIClient(informerCollection, osmNamespace, k8sClient, msgBroker)
204+
210205
certOpts, err := getCertOptions()
211206
if err != nil {
212207
log.Fatal().Err(err).Msg("Error getting certificate options")
@@ -249,10 +244,7 @@ func main() {
249244
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating Ingress client")
250245
}
251246

252-
policyController, err := policy.NewPolicyController(k8sClient, policyClient, stop, msgBroker)
253-
if err != nil {
254-
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for policy.openservicemesh.io")
255-
}
247+
policyController := policy.NewPolicyController(informerCollection, k8sClient, msgBroker)
256248

257249
meshCatalog := catalog.NewMeshCatalog(
258250
k8sClient,

cmd/osm-injector/osm-injector.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,7 @@ func main() {
193193
}
194194

195195
// Initialize Configurator to watch resources in the config.openservicemesh.io API group
196-
cfg, err := configurator.NewConfigurator(configClientset.NewForConfigOrDie(kubeConfig), stop, osmNamespace, osmMeshConfigName, msgBroker)
197-
if err != nil {
198-
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
199-
}
196+
cfg := configurator.NewConfigurator(informerCollection, osmNamespace, osmMeshConfigName, msgBroker)
200197

201198
// Initialize kubernetes.Controller to watch kubernetes resources
202199
kubeController := k8s.NewKubernetesController(informerCollection, policyClient, msgBroker, k8s.Namespaces)

pkg/catalog/fake/fake.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"k8s.io/client-go/kubernetes"
1111

1212
configClientset "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned"
13+
"github.com/openservicemesh/osm/pkg/k8s/informers"
1314

1415
"github.com/openservicemesh/osm/pkg/catalog"
1516
tresorFake "github.com/openservicemesh/osm/pkg/certificate/providers/tresor/fake"
@@ -46,10 +47,13 @@ func NewFakeMeshCatalog(kubeClient kubernetes.Interface, meshConfigClient config
4647

4748
osmNamespace := "-test-osm-namespace-"
4849
osmMeshConfigName := "-test-osm-mesh-config-"
49-
cfg, err := configurator.NewConfigurator(meshConfigClient, stop, osmNamespace, osmMeshConfigName, nil)
50+
ic, err := informers.NewInformerCollection("osm", stop, informers.WithKubeClient(kubeClient), informers.WithConfigClient(meshConfigClient))
5051
if err != nil {
5152
return nil
5253
}
54+
55+
cfg := configurator.NewConfigurator(ic, osmNamespace, osmMeshConfigName, nil)
56+
5357
certManager := tresorFake.NewFake(nil)
5458

5559
// #1683 tracks potential improvements to the following dynamic mocks

pkg/configurator/client.go

+13-92
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,10 @@ import (
44
"fmt"
55
"reflect"
66

7-
"github.com/pkg/errors"
8-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9-
"k8s.io/apimachinery/pkg/fields"
107
"k8s.io/client-go/tools/cache"
118

129
configv1alpha2 "github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
13-
configClientset "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned"
14-
configInformers "github.com/openservicemesh/osm/pkg/gen/client/config/informers/externalversions"
10+
"github.com/openservicemesh/osm/pkg/k8s/informers"
1511

1612
"github.com/openservicemesh/osm/pkg/announcements"
1713
"github.com/openservicemesh/osm/pkg/errcode"
@@ -21,44 +17,9 @@ import (
2117
)
2218

2319
// NewConfigurator implements configurator.Configurator and creates the Kubernetes client to manage namespaces.
24-
func NewConfigurator(configClient configClientset.Interface, stop <-chan struct{}, osmNamespace, meshConfigName string,
25-
msgBroker *messaging.Broker) (Configurator, error) {
26-
return newConfigurator(configClient, stop, osmNamespace, meshConfigName, msgBroker)
27-
}
28-
29-
func newConfigurator(configClient configClientset.Interface, stop <-chan struct{}, osmNamespace string, meshConfigName string,
30-
msgBroker *messaging.Broker) (*client, error) {
31-
listOption := configInformers.WithTweakListOptions(func(opt *metav1.ListOptions) {
32-
opt.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, meshConfigName).String()
33-
})
34-
35-
meshConfigInformerFactory := configInformers.NewSharedInformerFactoryWithOptions(
36-
configClient,
37-
k8s.DefaultKubeEventResyncInterval,
38-
configInformers.WithNamespace(osmNamespace),
39-
listOption,
40-
)
41-
42-
// informerFactory without listOptions
43-
configInformerFactory := configInformers.NewSharedInformerFactoryWithOptions(
44-
configClient,
45-
k8s.DefaultKubeEventResyncInterval,
46-
configInformers.WithNamespace(osmNamespace),
47-
)
48-
49-
informerCollection := informerCollection{
50-
meshConfig: meshConfigInformerFactory.Config().V1alpha2().MeshConfigs().Informer(),
51-
meshRootCertificate: configInformerFactory.Config().V1alpha2().MeshRootCertificates().Informer(),
52-
}
53-
54-
cacheCollection := cacheCollection{
55-
meshConfig: informerCollection.meshConfig.GetStore(),
56-
meshRootCertificate: informerCollection.meshRootCertificate.GetStore(),
57-
}
58-
59-
c := &client{
60-
informers: &informerCollection,
61-
caches: &cacheCollection,
20+
func NewConfigurator(informerCollection *informers.InformerCollection, osmNamespace, meshConfigName string, msgBroker *messaging.Broker) *Client {
21+
c := &Client{
22+
informers: informerCollection,
6223
osmNamespace: osmNamespace,
6324
meshConfigName: meshConfigName,
6425
}
@@ -69,70 +30,30 @@ func newConfigurator(configClient configClientset.Interface, stop <-chan struct{
6930
Update: announcements.MeshConfigUpdated,
7031
Delete: announcements.MeshConfigDeleted,
7132
}
72-
informerCollection.meshConfig.AddEventHandler(k8s.GetEventHandlerFuncs(nil, meshConfigEventTypes, msgBroker))
73-
informerCollection.meshConfig.AddEventHandler(c.metricsHandler())
33+
34+
informerCollection.AddEventHandler(informers.InformerKeyMeshConfig, k8s.GetEventHandlerFuncs(nil, meshConfigEventTypes, msgBroker))
35+
informerCollection.AddEventHandler(informers.InformerKeyMeshConfig, c.metricsHandler())
7436

7537
meshRootCertificateEventTypes := k8s.EventTypes{
7638
Add: announcements.MeshRootCertificateAdded,
7739
Update: announcements.MeshRootCertificateUpdated,
7840
Delete: announcements.MeshRootCertificateDeleted,
7941
}
80-
informerCollection.meshRootCertificate.AddEventHandler(k8s.GetEventHandlerFuncs(nil, meshRootCertificateEventTypes, msgBroker))
81-
82-
err := c.run(stop)
83-
if err != nil {
84-
return c, errors.Errorf("Could not start %s informer clients: %s", configv1alpha2.SchemeGroupVersion, err)
85-
}
86-
87-
return c, nil
88-
}
89-
90-
func (c *client) run(stop <-chan struct{}) error {
91-
log.Info().Msgf("Starting informer clients for API group %s", configv1alpha2.SchemeGroupVersion)
92-
93-
if c.informers == nil {
94-
return errors.New("config.openservicemesh.io informers not initialized")
95-
}
96-
97-
sharedInformers := map[string]cache.SharedInformer{
98-
"MeshConfig": c.informers.meshConfig,
99-
"MeshRootCertificate": c.informers.meshRootCertificate,
100-
}
101-
102-
var informerNames []string
103-
var hasSynced []cache.InformerSynced
104-
for name, informer := range sharedInformers {
105-
if informer == nil {
106-
log.Error().Msgf("Informer for '%s' not initialized, ignoring it", name) // TODO: log with errcode
107-
continue
108-
}
109-
informerNames = append(informerNames, name)
110-
log.Info().Msgf("Starting informer: %s", name)
111-
go informer.Run(stop)
112-
hasSynced = append(hasSynced, informer.HasSynced)
113-
}
114-
115-
log.Info().Msgf("Waiting for informers %v caches to sync", informerNames)
116-
if !cache.WaitForCacheSync(stop, hasSynced...) {
117-
// TODO(#3962): metric might not be scraped before process restart resulting from this error
118-
log.Error().Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrConfigInformerInitCache)).Msg("Failed initial cache sync for config.openservicemesh.io informers")
119-
return errors.New("Failed initial cache sync for config.openservicemesh.io informers")
120-
}
42+
informerCollection.AddEventHandler(informers.InformerKeyMeshRootCertificate, k8s.GetEventHandlerFuncs(nil, meshRootCertificateEventTypes, msgBroker))
12143

122-
log.Info().Msgf("Cache sync finished for %v informers in API group %s", informerNames, configv1alpha2.SchemeGroupVersion)
123-
return nil
44+
return c
12445
}
12546

126-
func (c *client) getMeshConfigCacheKey() string {
47+
func (c *Client) getMeshConfigCacheKey() string {
12748
return fmt.Sprintf("%s/%s", c.osmNamespace, c.meshConfigName)
12849
}
12950

13051
// Returns the current MeshConfig
131-
func (c *client) getMeshConfig() configv1alpha2.MeshConfig {
52+
func (c *Client) getMeshConfig() configv1alpha2.MeshConfig {
13253
var meshConfig configv1alpha2.MeshConfig
13354

13455
meshConfigCacheKey := c.getMeshConfigCacheKey()
135-
item, exists, err := c.caches.meshConfig.GetByKey(meshConfigCacheKey)
56+
item, exists, err := c.informers.GetByKey(informers.InformerKeyMeshConfig, meshConfigCacheKey)
13657
if err != nil {
13758
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrMeshConfigFetchFromCache)).Msgf("Error getting MeshConfig from cache with key %s", meshConfigCacheKey)
13859
return meshConfig
@@ -147,7 +68,7 @@ func (c *client) getMeshConfig() configv1alpha2.MeshConfig {
14768
return meshConfig
14869
}
14970

150-
func (c *client) metricsHandler() cache.ResourceEventHandlerFuncs {
71+
func (c *Client) metricsHandler() cache.ResourceEventHandlerFuncs {
15172
handleMetrics := func(obj interface{}) {
15273
config := obj.(*configv1alpha2.MeshConfig)
15374

pkg/configurator/client_test.go

+8-13
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ import (
55

66
"github.com/stretchr/testify/assert"
77
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8-
"k8s.io/client-go/tools/cache"
98

109
fakeConfig "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned/fake"
10+
"github.com/openservicemesh/osm/pkg/k8s/informers"
1111
"github.com/openservicemesh/osm/pkg/metricsstore"
1212

1313
configv1alpha2 "github.com/openservicemesh/osm/pkg/apis/config/v1alpha2"
@@ -23,9 +23,12 @@ func TestGetMeshConfig(t *testing.T) {
2323

2424
meshConfigClient := fakeConfig.NewSimpleClientset()
2525
stop := make(chan struct{})
26-
c, err := newConfigurator(meshConfigClient, stop, osmNamespace, osmMeshConfigName, nil)
26+
27+
ic, err := informers.NewInformerCollection("osm", stop, informers.WithConfigClient(meshConfigClient))
2728
a.Nil(err)
2829

30+
c := NewConfigurator(ic, osmNamespace, osmMeshConfigName, nil)
31+
2932
// Returns empty MeshConfig if informer cache is empty
3033
a.Equal(configv1alpha2.MeshConfig{}, c.getMeshConfig())
3134

@@ -39,25 +42,17 @@ func TestGetMeshConfig(t *testing.T) {
3942
Name: osmMeshConfigName,
4043
},
4144
}
42-
err = c.caches.meshConfig.Add(newObj)
45+
err = c.informers.Add(informers.InformerKeyMeshConfig, newObj, t)
4346
a.Nil(err)
4447
a.Equal(*newObj, c.getMeshConfig())
4548
}
4649

47-
type store struct {
48-
cache.Store
49-
}
50-
51-
func (s *store) GetByKey(_ string) (interface{}, bool, error) {
52-
return nil, false, nil
53-
}
54-
5550
func TestMetricsHandler(t *testing.T) {
5651
a := assert.New(t)
5752

58-
c := &client{
59-
caches: &cacheCollection{meshConfig: &store{}},
53+
c := &Client{
6054
meshConfigName: osmMeshConfigName,
55+
informers: &informers.InformerCollection{},
6156
}
6257
handlers := c.metricsHandler()
6358
metricsstore.DefaultMetricsStore.Start(metricsstore.DefaultMetricsStore.FeatureFlagEnabled)

0 commit comments

Comments
 (0)