forked from openservicemesh/osm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathosm-injector.go
299 lines (251 loc) · 12.4 KB
/
osm-injector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
// Package main implements the main entrypoint for osm-injector and utility routines to
// bootstrap the various internal components of osm-injector.
// osm-injector provides the automatic sidecar injection capability in OSM.
package main
import (
"context"
"flag"
"fmt"
"os"
"time"
"github.com/pkg/errors"
smiAccessClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/access/clientset/versioned"
smiTrafficSpecClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/specs/clientset/versioned"
smiTrafficSplitClient "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
"github.com/spf13/pflag"
admissionv1 "k8s.io/api/admissionregistration/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/clientcmd"
configClientset "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned"
policyClientset "github.com/openservicemesh/osm/pkg/gen/client/policy/clientset/versioned"
"github.com/openservicemesh/osm/pkg/certificate/providers"
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/httpserver"
"github.com/openservicemesh/osm/pkg/injector"
"github.com/openservicemesh/osm/pkg/k8s"
"github.com/openservicemesh/osm/pkg/k8s/events"
"github.com/openservicemesh/osm/pkg/k8s/informers"
"github.com/openservicemesh/osm/pkg/logger"
"github.com/openservicemesh/osm/pkg/messaging"
"github.com/openservicemesh/osm/pkg/metricsstore"
"github.com/openservicemesh/osm/pkg/reconciler"
"github.com/openservicemesh/osm/pkg/signals"
"github.com/openservicemesh/osm/pkg/version"
)
var (
verbosity string
meshName string // An ID that uniquely identifies an OSM instance
kubeConfigFile string
osmNamespace string
webhookConfigName string
caBundleSecretName string
osmMeshConfigName string
webhookTimeout int32
osmVersion string
injectorConfig injector.Config
certProviderKind string
enableReconciler bool
tresorOptions providers.TresorOptions
vaultOptions providers.VaultOptions
certManagerOptions providers.CertManagerOptions
osmContainerPullPolicy string
scheme = runtime.NewScheme()
)
var (
flags = pflag.NewFlagSet(`osm-injector`, pflag.ExitOnError)
log = logger.New("osm-injector/main")
)
func init() {
flags.StringVarP(&verbosity, "verbosity", "v", "info", "Set log verbosity level")
flags.StringVar(&meshName, "mesh-name", "", "OSM mesh name")
flags.StringVar(&kubeConfigFile, "kubeconfig", "", "Path to Kubernetes config file.")
flags.StringVar(&osmNamespace, "osm-namespace", "", "Namespace to which OSM belongs to.")
flags.StringVar(&webhookConfigName, "webhook-config-name", "", "Name of the MutatingWebhookConfiguration to be configured by osm-injector")
flags.Int32Var(&webhookTimeout, "webhook-timeout", int32(20), "Timeout of the MutatingWebhookConfiguration")
flags.StringVar(&osmMeshConfigName, "osm-config-name", "osm-mesh-config", "Name of the OSM MeshConfig")
flags.StringVar(&osmVersion, "osm-version", "", "Version of OSM")
// sidecar injector options
flags.IntVar(&injectorConfig.ListenPort, "webhook-port", constants.InjectorWebhookPort, "Webhook port for sidecar-injector")
// Generic certificate manager/provider options
flags.StringVar(&certProviderKind, "certificate-manager", providers.TresorKind.String(), fmt.Sprintf("Certificate manager, one of [%v]", providers.ValidCertificateProviders))
flags.StringVar(&caBundleSecretName, "ca-bundle-secret-name", "", "Name of the Kubernetes Secret for the OSM CA bundle")
// Vault certificate manager/provider options
flags.StringVar(&vaultOptions.VaultProtocol, "vault-protocol", "http", "Host name of the Hashi Vault")
flags.StringVar(&vaultOptions.VaultHost, "vault-host", "vault.default.svc.cluster.local", "Host name of the Hashi Vault")
flags.StringVar(&vaultOptions.VaultToken, "vault-token", "", "Secret token for the the Hashi Vault")
flags.StringVar(&vaultOptions.VaultRole, "vault-role", "openservicemesh", "Name of the Vault role dedicated to Open Service Mesh")
flags.IntVar(&vaultOptions.VaultPort, "vault-port", 8200, "Port of the Hashi Vault")
// Cert-manager certificate manager/provider options
flags.StringVar(&certManagerOptions.IssuerName, "cert-manager-issuer-name", "osm-ca", "cert-manager issuer name")
flags.StringVar(&certManagerOptions.IssuerKind, "cert-manager-issuer-kind", "Issuer", "cert-manager issuer kind")
flags.StringVar(&certManagerOptions.IssuerGroup, "cert-manager-issuer-group", "cert-manager.io", "cert-manager issuer group")
// Reconciler options
flags.BoolVar(&enableReconciler, "enable-reconciler", false, "Enable reconciler for CDRs, mutating webhook and validating webhook")
flags.StringVar(&osmContainerPullPolicy, "osm-container-pull-policy", "", "The pullPolicy to use for injected init and healthcheck containers")
_ = clientgoscheme.AddToScheme(scheme)
_ = admissionv1.AddToScheme(scheme)
}
// TODO(#4502): This function can be deleted once we get rid of cert options.
func getCertOptions() (providers.Options, error) {
switch providers.Kind(certProviderKind) {
case providers.TresorKind:
tresorOptions.SecretName = caBundleSecretName
return tresorOptions, nil
case providers.VaultKind:
return vaultOptions, nil
case providers.CertManagerKind:
return certManagerOptions, nil
}
return nil, fmt.Errorf("unknown certificate provider kind: %s", certProviderKind)
}
func main() {
log.Info().Msgf("Starting osm-injector %s; %s; %s", version.Version, version.GitCommit, version.BuildDate)
if err := parseFlags(); err != nil {
log.Fatal().Err(err).Msg("Error parsing cmd line arguments")
}
if err := logger.SetLogLevel(verbosity); err != nil {
log.Fatal().Err(err).Msg("Error setting log level")
}
// Initialize kube config and client
kubeConfig, err := clientcmd.BuildConfigFromFlags("", kubeConfigFile)
if err != nil {
log.Fatal().Err(err).Msgf("Error creating kube config (kubeconfig=%s)", kubeConfigFile)
}
kubeClient := kubernetes.NewForConfigOrDie(kubeConfig)
policyClient := policyClientset.NewForConfigOrDie(kubeConfig)
configClient := configClientset.NewForConfigOrDie(kubeConfig)
// Initialize the generic Kubernetes event recorder and associate it with the osm-injector pod resource
injectorPod, err := getInjectorPod(kubeClient)
if err != nil {
log.Fatal().Msg("Error fetching osm-injector pod")
}
eventRecorder := events.GenericEventRecorder()
if err := eventRecorder.Initialize(injectorPod, kubeClient, osmNamespace); err != nil {
log.Fatal().Msg("Error initializing generic event recorder")
}
// This ensures CLI parameters (and dependent values) are correct.
if err := validateCLIParams(); err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InvalidCLIParameters, "Error validating CLI parameters")
}
stop := signals.RegisterExitHandlers()
_, cancel := context.WithCancel(context.Background())
defer cancel()
// Start the default metrics store
metricsstore.DefaultMetricsStore.Start(
metricsstore.DefaultMetricsStore.CertIssuedCount,
metricsstore.DefaultMetricsStore.CertIssuedTime,
metricsstore.DefaultMetricsStore.ErrCodeCounter,
metricsstore.DefaultMetricsStore.HTTPResponseTotal,
metricsstore.DefaultMetricsStore.HTTPResponseDuration,
metricsstore.DefaultMetricsStore.AdmissionWebhookResponseTotal,
metricsstore.DefaultMetricsStore.ReconciliationTotal,
)
msgBroker := messaging.NewBroker(stop)
smiTrafficSplitClientSet := smiTrafficSplitClient.NewForConfigOrDie(kubeConfig)
smiTrafficSpecClientSet := smiTrafficSpecClient.NewForConfigOrDie(kubeConfig)
smiTrafficTargetClientSet := smiAccessClient.NewForConfigOrDie(kubeConfig)
informerCollection, err := informers.NewInformerCollection(meshName, stop,
informers.WithKubeClient(kubeClient),
informers.WithSMIClients(smiTrafficSplitClientSet, smiTrafficSpecClientSet, smiTrafficTargetClientSet),
informers.WithConfigClient(configClient),
informers.WithPolicyClient(policyClient),
)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating informer collection")
}
// Initialize Configurator to watch resources in the config.openservicemesh.io API group
cfg, err := configurator.NewConfigurator(configClientset.NewForConfigOrDie(kubeConfig), stop, osmNamespace, osmMeshConfigName, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating controller for config.openservicemesh.io")
}
// Initialize kubernetes.Controller to watch kubernetes resources
kubeController := k8s.NewKubernetesController(informerCollection, policyClient, msgBroker, k8s.Namespaces)
certOpts, err := getCertOptions()
if err != nil {
log.Fatal().Err(err).Msg("Error getting certificate options")
}
// Intitialize certificate manager/provider
certManager, err := providers.NewCertificateManager(kubeClient, kubeConfig, cfg, osmNamespace,
certOpts, msgBroker)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InvalidCertificateManager,
"Error initializing certificate manager of kind %s", certProviderKind)
}
// watch for certificate rotation
certManager.Start(5*time.Second, stop)
// Initialize the sidecar injector webhook
if err := injector.NewMutatingWebhook(injectorConfig, kubeClient, certManager, kubeController, meshName, osmNamespace, webhookConfigName, osmVersion, webhookTimeout, enableReconciler, stop, cfg, corev1.PullPolicy(osmContainerPullPolicy)); err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating sidecar injector webhook")
}
version.SetMetric()
/*
* Initialize osm-injector's HTTP server
*/
httpServer := httpserver.NewHTTPServer(constants.OSMHTTPServerPort)
// Metrics
httpServer.AddHandler(constants.MetricsPath, metricsstore.DefaultMetricsStore.Handler())
// Version
httpServer.AddHandler(constants.VersionPath, version.GetVersionHandler())
// Start HTTP server
err = httpServer.Start()
if err != nil {
log.Fatal().Err(err).Msgf("Failed to start OSM metrics/probes HTTP server")
}
// Start the global log level watcher that updates the log level dynamically
go k8s.WatchAndUpdateLogLevel(msgBroker, stop)
if enableReconciler {
log.Info().Msgf("OSM reconciler enabled for sidecar injector webhook")
err = reconciler.NewReconcilerClient(kubeClient, nil, meshName, osmVersion, stop, reconciler.MutatingWebhookInformerKey)
if err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error creating reconciler client to reconcile sidecar injector webhook")
}
}
<-stop
log.Info().Msgf("Stopping osm-injector %s; %s; %s", version.Version, version.GitCommit, version.BuildDate)
}
func parseFlags() error {
if err := flags.Parse(os.Args); err != nil {
return err
}
_ = flag.CommandLine.Parse([]string{})
return nil
}
// getInjectorPod returns the osm-injector pod spec.
// The pod name is inferred from the 'INJECTOR_POD_NAME' env variable which is set during deployment.
func getInjectorPod(kubeClient kubernetes.Interface) (*corev1.Pod, error) {
podName := os.Getenv("INJECTOR_POD_NAME")
if podName == "" {
return nil, errors.New("INJECTOR_POD_NAME env variable cannot be empty")
}
pod, err := kubeClient.CoreV1().Pods(osmNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
// TODO(#3962): metric might not be scraped before process restart resulting from this error
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrFetchingInjectorPod)).
Msgf("Error retrieving osm-injector pod %s", podName)
return nil, err
}
return pod, nil
}
// validateCLIParams contains all checks necessary that various permutations of the CLI flags are consistent
func validateCLIParams() error {
if meshName == "" {
return errors.New("Please specify the mesh name using --mesh-name")
}
if osmNamespace == "" {
return errors.New("Please specify the OSM namespace using --osm-namespace")
}
if webhookConfigName == "" {
return errors.Errorf("Please specify the mutatingwebhookconfiguration name using --webhook-config-name value")
}
if caBundleSecretName == "" {
return errors.Errorf("Please specify the CA bundle secret name using --ca-bundle-secret-name")
}
return nil
}