Skip to content

Add events count and export metrics #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 20, 2024
Merged
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,21 @@ Alternatively, you can pass this configuration directly using the `--set` flag:
helm install kubewatch robusta/kubewatch --set='rbac.create=true,slack.channel=#YOUR_CHANNEL,slack.token=xoxb-YOUR_TOKEN,customRoles[0].apiGroups={monitoring.coreos.com},customRoles[0].resources={prometheusrules},customRoles[0].verbs={get,list,watch}'
```

#### Metrics
`kubewatch` runs a Prometheus metrics endpoint at `/metrics` on port `2112` by default. This endpoint can be used to monitor health and the performance of `kubewatch`.

The `kubewatch_events_total` metric can help track the total number of Kubernetes events, categorized by resource type (e.g., `Pods`, `Deployments`) and event type (e.g., `Create`, `Delete`).

You can change the default port (`2112`) on which the metrics server listens by setting the `LISTEN_ADDRESS` environment variable.
Format is `host:port`. `:5454` means any host, and port `5454`


```yaml
extraEnvVars:
- name: LISTEN_ADDRESS
value: ":5454"
```

### Local Installation
#### Using go package installer:

Expand Down
3 changes: 1 addition & 2 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package cmd

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"

Expand Down Expand Up @@ -94,7 +93,7 @@ var configViewCmd = &cobra.Command{
Display the contents of the contents of ~/.kubewatch.yaml`,
Run: func(cmd *cobra.Command, args []string) {
fmt.Fprintln(os.Stderr, "Contents of ~/.kubewatch.yaml")
configFile, err := ioutil.ReadFile(filepath.Join(os.Getenv("HOME"), kubewatchConfigFile))
configFile, err := os.ReadFile(filepath.Join(os.Getenv("HOME"), kubewatchConfigFile))
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ limitations under the License.
package config

import (
"io/ioutil"
"io"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -237,7 +237,7 @@ func (c *Config) Load() error {
return err
}

b, err := ioutil.ReadAll(file)
b, err := io.ReadAll(file)
if err != nil {
return err
}
Expand Down
6 changes: 1 addition & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/magiconair/properties v1.7.4 // indirect
github.com/mkmik/multierror v0.3.0
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pelletier/go-toml v1.0.1 // indirect
github.com/prometheus/client_golang v1.20.3
github.com/segmentio/textio v1.2.0
github.com/sirupsen/logrus v1.6.0
github.com/slack-go/slack v0.6.5
Expand All @@ -19,10 +19,6 @@ require (
github.com/spf13/jwalterweatherman v0.0.0-20180109140146-7c0cea34c8ec // indirect
github.com/spf13/viper v1.0.0
github.com/tbruyelle/hipchat-go v0.0.0-20160921153256-749fb9e14beb
golang.org/x/net v0.23.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.20.15
k8s.io/apimachinery v0.20.15
Expand Down
1,436 changes: 1,408 additions & 28 deletions go.sum

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions pkg/client/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ limitations under the License.
package client

import (
"net/http"
"os"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/bitnami-labs/kubewatch/config"
"github.com/bitnami-labs/kubewatch/pkg/controller"
"github.com/bitnami-labs/kubewatch/pkg/handlers"
Expand All @@ -35,6 +39,18 @@ import (

// Run runs the event loop processing with given handler
func Run(conf *config.Config) {
listenAddress := os.Getenv("LISTEN_ADDRESS")
if listenAddress == "" {
listenAddress = ":2112"
}

go func() {
http.Handle("/metrics", promhttp.Handler())
logrus.Infof("Starting metrics server on port %s", listenAddress)
if err := http.ListenAndServe(listenAddress, nil); err != nil {
logrus.Errorf("Error starting metrics server on port %s: %v", listenAddress, err)
}
}()

var eventHandler = ParseEventHandler(conf)
controller.Start(conf, eventHandler)
Expand Down
61 changes: 39 additions & 22 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const maxRetries = 5
Expand Down Expand Up @@ -93,6 +96,14 @@ func objName(obj interface{}) string {
func Start(conf *config.Config, eventHandler handlers.Handler) {
var kubeClient kubernetes.Interface
var dynamicClient dynamic.Interface

kubewatchEventsMetrics := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "kubewatch_events_total",
Help: "The total number of Kubernetes events observed by Kubewatch, labeled by resource and event type",
},
[]string{"resourceType", "eventType"},
)

if _, err := rest.InClusterConfig(); err != nil {
kubeClient = utils.GetClientOutOfCluster()
Expand Down Expand Up @@ -120,7 +131,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

allCoreEventsController := newResourceController(kubeClient, eventHandler, allCoreEventsInformer, objName(api_v1.Event{}), V1)
allCoreEventsController := newResourceController(kubeClient, eventHandler, allCoreEventsInformer, objName(api_v1.Event{}), V1, kubewatchEventsMetrics)
stopAllCoreEventsCh := make(chan struct{})
defer close(stopAllCoreEventsCh)

Expand All @@ -144,7 +155,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

allEventsController := newResourceController(kubeClient, eventHandler, allEventsInformer, objName(events_v1.Event{}), EVENTS_V1)
allEventsController := newResourceController(kubeClient, eventHandler, allEventsInformer, objName(events_v1.Event{}), EVENTS_V1, kubewatchEventsMetrics)
stopAllEventsCh := make(chan struct{})
defer close(stopAllEventsCh)

Expand All @@ -166,7 +177,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Pod{}), V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Pod{}), V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -188,7 +199,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(autoscaling_v1.HorizontalPodAutoscaler{}), AUTOSCALING_V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(autoscaling_v1.HorizontalPodAutoscaler{}), AUTOSCALING_V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -211,7 +222,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.DaemonSet{}), APPS_V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.DaemonSet{}), APPS_V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -233,7 +244,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.StatefulSet{}), APPS_V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.StatefulSet{}), APPS_V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -255,7 +266,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.ReplicaSet{}), APPS_V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.ReplicaSet{}), APPS_V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -277,7 +288,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Service{}), V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Service{}), V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -299,7 +310,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.Deployment{}), APPS_V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(apps_v1.Deployment{}), APPS_V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -321,7 +332,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Namespace{}), V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Namespace{}), V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -343,7 +354,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ReplicationController{}), V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ReplicationController{}), V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -365,7 +376,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(batch_v1.Job{}), BATCH_V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(batch_v1.Job{}), BATCH_V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -387,7 +398,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Node{}), V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Node{}), V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -409,7 +420,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ServiceAccount{}), V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ServiceAccount{}), V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -431,7 +442,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRole{}), RBAC_V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRole{}), RBAC_V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -453,7 +464,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRoleBinding{}), RBAC_V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(rbac_v1.ClusterRoleBinding{}), RBAC_V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -475,7 +486,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.PersistentVolume{}), V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.PersistentVolume{}), V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -497,7 +508,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Secret{}), V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.Secret{}), V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -519,7 +530,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ConfigMap{}), V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(api_v1.ConfigMap{}), V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -541,7 +552,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, objName(networking_v1.Ingress{}), NETWORKING_V1)
c := newResourceController(kubeClient, eventHandler, informer, objName(networking_v1.Ingress{}), NETWORKING_V1, kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand Down Expand Up @@ -572,7 +583,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
cache.Indexers{},
)

c := newResourceController(kubeClient, eventHandler, informer, crd.Resource, fmt.Sprintf("%s/%s", crd.Group, crd.Version))
c := newResourceController(kubeClient, eventHandler, informer, crd.Resource, fmt.Sprintf("%s/%s", crd.Group, crd.Version), kubewatchEventsMetrics)
stopCh := make(chan struct{})
defer close(stopCh)

Expand All @@ -585,7 +596,7 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
<-sigterm
}

func newResourceController(client kubernetes.Interface, eventHandler handlers.Handler, informer cache.SharedIndexInformer, resourceType string, apiVersion string) *Controller {
func newResourceController(client kubernetes.Interface, eventHandler handlers.Handler, informer cache.SharedIndexInformer, resourceType string, apiVersion string, kubewatchEventsMetrics *prometheus.CounterVec) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
var newEvent Event
var err error
Expand All @@ -605,6 +616,8 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha
if err == nil {
queue.Add(newEvent)
}

kubewatchEventsMetrics.WithLabelValues(resourceType, "create").Inc()
},
UpdateFunc: func(old, new interface{}) {
var ok bool
Expand All @@ -625,6 +638,8 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha
if err == nil {
queue.Add(newEvent)
}

kubewatchEventsMetrics.WithLabelValues(resourceType, "update").Inc()
},
DeleteFunc: func(obj interface{}) {
var ok bool
Expand All @@ -641,6 +656,8 @@ func newResourceController(client kubernetes.Interface, eventHandler handlers.Ha
if err == nil {
queue.Add(newEvent)
}

kubewatchEventsMetrics.WithLabelValues(resourceType, "delete").Inc()
},
})

Expand Down
4 changes: 2 additions & 2 deletions pkg/handlers/msteam/msteam.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"encoding/json"
"fmt"
"github.com/sirupsen/logrus"
"io/ioutil"
"io"
"net/http"
"os"

Expand Down Expand Up @@ -98,7 +98,7 @@ func sendCard(ms *MSTeams, card *TeamsMessageCard) (*http.Response, error) {
ms.TeamsWebhookURL, err)
}
if res.StatusCode != http.StatusOK {
resMessage, err := ioutil.ReadAll(res.Body)
resMessage, err := io.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("Failed reading Teams http response: %v", err)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/handlers/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"crypto/x509"
"fmt"
"github.com/sirupsen/logrus"
"io/ioutil"
"os"

"bytes"
Expand Down Expand Up @@ -88,7 +87,7 @@ func (m *Webhook) Init(c *config.Config) error {
if cert == "" {
logrus.Printf("No webhook cert is given")
} else {
caCert, err := ioutil.ReadFile(cert)
caCert, err := os.ReadFile(cert)
if err != nil {
logrus.Printf("%s\n", err)
return err
Expand Down
Loading
Loading