Skip to content

Commit 126d478

Browse files
authored
feat: separate port for hook metrics (#190)
* feat: separate port for hook metrics - hook-metrics-listen-port flag to specify a listen port for hook metrics - fix labels cardinality for kube_event_manager metrics
1 parent 26c78c9 commit 126d478

File tree

6 files changed

+94
-17
lines changed

6 files changed

+94
-17
lines changed

pkg/app/app.go

+6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ var TempDir = "/tmp/shell-operator"
1616

1717
var ListenAddress = "0.0.0.0"
1818
var ListenPort = "9115"
19+
var HookMetricsListenPort = ""
1920

2021
var PrometheusMetricsPrefix = "shell_operator_"
2122

@@ -45,6 +46,11 @@ func DefineStartCommandFlags(kpApp *kingpin.Application, cmd *kingpin.CmdClause)
4546
Default(PrometheusMetricsPrefix).
4647
StringVar(&PrometheusMetricsPrefix)
4748

49+
cmd.Flag("hook-metrics-listen-port", "Port to use to serve hooks’ custom metrics to Prometheus. Can be set with $SHELL_OPERATOR_HOOK_METRICS_LISTEN_PORT. Equal to listen-port if empty.").
50+
Envar("SHELL_OPERATOR_HOOK_METRICS_LISTEN_PORT").
51+
Default(HookMetricsListenPort).
52+
StringVar(&HookMetricsListenPort)
53+
4854
DefineKubeClientFlags(cmd)
4955
DefineJqFlags(cmd)
5056
DefineLoggingFlags(cmd)

pkg/hook/hook_manager.go

+1
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ func (hm *hookManager) loadHook(hookPath string) (hook *Hook, err error) {
146146
kubeCfg.Monitor.Metadata.MetricLabels = map[string]string{
147147
"hook": hook.Name,
148148
"binding": kubeCfg.BindingName,
149+
"queue": kubeCfg.Queue,
149150
}
150151
}
151152

pkg/metric_storage/metric_storage.go

+31-4
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package metric_storage
33
import (
44
"context"
55
"fmt"
6+
"net/http"
67
"strings"
78
"sync"
89

910
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/client_golang/prometheus/promhttp"
1012
log "github.com/sirupsen/logrus"
1113

1214
"github.com/flant/shell-operator/pkg/metric_storage/operation"
@@ -38,16 +40,24 @@ type MetricStorage struct {
3840
histogramsLock sync.RWMutex
3941

4042
GroupedVault *vault.GroupedVault
43+
44+
Registry *prometheus.Registry
45+
Gatherer prometheus.Gatherer
46+
Registerer prometheus.Registerer
4147
}
4248

4349
func NewMetricStorage() *MetricStorage {
44-
return &MetricStorage{
50+
m := &MetricStorage{
4551
Gauges: make(map[string]*prometheus.GaugeVec),
4652
Counters: make(map[string]*prometheus.CounterVec),
4753
Histograms: make(map[string]*prometheus.HistogramVec),
4854
HistogramBuckets: make(map[string][]float64),
4955
GroupedVault: vault.NewGroupedVault(),
56+
Gatherer: prometheus.DefaultGatherer,
57+
Registerer: prometheus.DefaultRegisterer,
5058
}
59+
m.GroupedVault.Registerer = m.Registerer
60+
return m
5161
}
5262

5363
func (m *MetricStorage) WithContext(ctx context.Context) {
@@ -58,6 +68,13 @@ func (m *MetricStorage) WithPrefix(prefix string) {
5868
m.Prefix = prefix
5969
}
6070

71+
func (m *MetricStorage) WithNewRegistry() {
72+
m.Registry = prometheus.NewRegistry()
73+
m.Gatherer = m.Registry
74+
m.Registerer = m.Registry
75+
m.GroupedVault.Registerer = m.Registry
76+
}
77+
6178
func (m *MetricStorage) Stop() {
6279
if m.cancel != nil {
6380
m.cancel()
@@ -148,7 +165,7 @@ func (m *MetricStorage) RegisterGauge(metric string, labels map[string]string) *
148165
},
149166
LabelNames(labels),
150167
)
151-
prometheus.MustRegister(vec)
168+
m.Registerer.MustRegister(vec)
152169
m.Gauges[metric] = vec
153170
return vec
154171
}
@@ -209,7 +226,7 @@ func (m *MetricStorage) RegisterCounter(metric string, labels map[string]string)
209226
},
210227
LabelNames(labels),
211228
)
212-
prometheus.MustRegister(vec)
229+
m.Registerer.MustRegister(vec)
213230
m.Counters[metric] = vec
214231
return vec
215232
}
@@ -277,7 +294,7 @@ func (m *MetricStorage) RegisterHistogram(metric string, labels map[string]strin
277294
Buckets: buckets,
278295
}, LabelNames(labels))
279296

280-
prometheus.MustRegister(vec)
297+
m.Registerer.MustRegister(vec)
281298
m.Histograms[metric] = vec
282299
return vec
283300
}
@@ -382,3 +399,13 @@ func (m *MetricStorage) ApplyGroupOperations(group string, ops []operation.Metri
382399
}
383400
}
384401
}
402+
403+
func (m *MetricStorage) Handler() http.Handler {
404+
if m.Registry == nil {
405+
return promhttp.Handler()
406+
} else {
407+
return promhttp.HandlerFor(m.Registry, promhttp.HandlerOpts{
408+
Registry: m.Registry,
409+
})
410+
}
411+
}

pkg/metric_storage/vault/vault.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
type GroupedVault struct {
1414
collectors map[string]ConstMetricCollector
1515
mtx sync.Mutex
16+
Registerer prometheus.Registerer
1617
}
1718

1819
func NewGroupedVault() *GroupedVault {
@@ -38,7 +39,7 @@ func (v *GroupedVault) GetOrCreateCounterCollector(name string, labelNames []str
3839
collector, ok := v.collectors[name]
3940
if !ok {
4041
collector = NewConstCounterCollector(name, labelNames)
41-
if err := prometheus.Register(collector); err != nil {
42+
if err := v.Registerer.Register(collector); err != nil {
4243
return nil, fmt.Errorf("counter '%s' %v registration: %v", name, labelNames, err)
4344
}
4445
v.collectors[name] = collector
@@ -55,7 +56,7 @@ func (v *GroupedVault) GetOrCreateGaugeCollector(name string, labelNames []strin
5556
collector, ok := v.collectors[name]
5657
if !ok {
5758
collector = NewConstGaugeCollector(name, labelNames)
58-
if err := prometheus.Register(collector); err != nil {
59+
if err := v.Registerer.Register(collector); err != nil {
5960
return nil, fmt.Errorf("gauge '%s' %v registration: %v", name, labelNames, err)
6061
}
6162
v.collectors[name] = collector

pkg/metric_storage/vault/vault_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ func Test_CounterAdd(t *testing.T) {
1919
log.SetOutput(buf)
2020

2121
v := NewGroupedVault()
22+
v.Registerer = prometheus.DefaultRegisterer
2223

2324
v.CounterAdd("group1", "metric_total", 1.0, map[string]string{"lbl": "val"})
2425

pkg/shell-operator/operator.go

+52-11
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"time"
1111

1212
"github.com/go-chi/chi"
13-
"github.com/prometheus/client_golang/prometheus/promhttp"
1413
log "github.com/sirupsen/logrus"
1514
uuid "gopkg.in/satori/go.uuid.v1"
1615

@@ -45,7 +44,9 @@ type ShellOperator struct {
4544
TempDir string
4645

4746
MetricStorage *metric_storage.MetricStorage
48-
KubeClient kube.KubernetesClient
47+
// separate metric storage for hook metrics if separate listen port is configured
48+
HookMetricStorage *metric_storage.MetricStorage
49+
KubeClient kube.KubernetesClient
4950

5051
ScheduleManager schedule_manager.ScheduleManager
5152
KubeEventsManager kube_events_manager.KubeEventsManager
@@ -90,11 +91,11 @@ func (op *ShellOperator) WithMetricStorage(metricStorage *metric_storage.MetricS
9091
op.MetricStorage = metricStorage
9192
}
9293

94+
// InitMetricStorage creates default MetricStorage object if not set earlier.
9395
func (op *ShellOperator) InitMetricStorage() {
9496
if op.MetricStorage != nil {
9597
return
9698
}
97-
// Metric storage.
9899
metricStorage := metric_storage.NewMetricStorage()
99100
metricStorage.WithContext(op.ctx)
100101
metricStorage.WithPrefix(app.PrometheusMetricsPrefix)
@@ -103,6 +104,20 @@ func (op *ShellOperator) InitMetricStorage() {
103104
op.MetricStorage = metricStorage
104105
}
105106

107+
// InitHookMetricStorage creates MetricStorage object
108+
// with new registry to scrape hook metrics on separate port.
109+
func (op *ShellOperator) InitHookMetricStorage() {
110+
if op.HookMetricStorage != nil {
111+
return
112+
}
113+
metricStorage := metric_storage.NewMetricStorage()
114+
metricStorage.WithContext(op.ctx)
115+
metricStorage.WithPrefix(app.PrometheusMetricsPrefix)
116+
metricStorage.WithNewRegistry()
117+
metricStorage.Start()
118+
op.HookMetricStorage = metricStorage
119+
}
120+
106121
// Init does some basic checks and instantiate dependencies
107122
//
108123
// - check directories
@@ -416,7 +431,7 @@ func (op *ShellOperator) TaskHandleHookRun(t task.Task) queue.TaskResult {
416431
metrics, err := taskHook.Run(hookMeta.BindingType, hookMeta.BindingContext, hookLogLabels)
417432

418433
if err == nil {
419-
err = op.MetricStorage.SendBatch(metrics, map[string]string{
434+
err = op.HookMetricStorage.SendBatch(metrics, map[string]string{
420435
"hook": hookMeta.HookName,
421436
})
422437
}
@@ -674,10 +689,10 @@ func (op *ShellOperator) SetupHttpServerHandles() {
674689
</html>`, app.ListenPort)
675690
})
676691

677-
http.Handle("/metrics", promhttp.Handler())
692+
http.Handle("/metrics", op.MetricStorage.Handler())
678693
}
679694

680-
func (op *ShellOperator) StartHttpServer(ip string, port string) error {
695+
func (op *ShellOperator) StartHttpServer(ip string, port string, mux *http.ServeMux) error {
681696
address := fmt.Sprintf("%s:%s", ip, port)
682697

683698
// Check if port is available
@@ -690,7 +705,7 @@ func (op *ShellOperator) StartHttpServer(ip string, port string) error {
690705
log.Infof("Listen on %s", address)
691706

692707
go func() {
693-
if err := http.Serve(listener, nil); err != nil {
708+
if err := http.Serve(listener, mux); err != nil {
694709
log.Errorf("Error starting HTTP server: %s", err)
695710
os.Exit(1)
696711
}
@@ -699,22 +714,48 @@ func (op *ShellOperator) StartHttpServer(ip string, port string) error {
699714
return nil
700715
}
701716

717+
func (op *ShellOperator) SetupHookMetricStorageAndServer() error {
718+
if op.HookMetricStorage != nil {
719+
return nil
720+
}
721+
if app.HookMetricsListenPort == "" || app.HookMetricsListenPort == app.ListenPort {
722+
// register default prom handler in DefaultServeMux
723+
op.HookMetricStorage = op.MetricStorage
724+
} else {
725+
// create new metric storage for hooks
726+
op.InitHookMetricStorage()
727+
// Create new ServeMux, serve on custom port
728+
mux := http.NewServeMux()
729+
err := op.StartHttpServer(app.ListenAddress, app.HookMetricsListenPort, mux)
730+
if err != nil {
731+
return err
732+
}
733+
// register scrape handler
734+
mux.Handle("/metrics", op.HookMetricStorage.Handler())
735+
}
736+
return nil
737+
}
738+
702739
func DefaultOperator() *ShellOperator {
703740
operator := NewShellOperator()
704741
operator.WithContext(context.Background())
705742
return operator
706743
}
707744

708745
func InitAndStart(operator *ShellOperator) error {
709-
operator.SetupHttpServerHandles()
710-
711-
err := operator.StartHttpServer(app.ListenAddress, app.ListenPort)
746+
err := operator.StartHttpServer(app.ListenAddress, app.ListenPort, http.DefaultServeMux)
712747
if err != nil {
713748
log.Errorf("HTTP SERVER start failed: %v", err)
714749
return err
715750
}
716-
717751
operator.InitMetricStorage()
752+
operator.SetupHttpServerHandles()
753+
754+
err = operator.SetupHookMetricStorageAndServer()
755+
if err != nil {
756+
log.Errorf("HTTP SERVER for hook metrics start failed: %v", err)
757+
return err
758+
}
718759

719760
err = operator.Init()
720761
if err != nil {

0 commit comments

Comments
 (0)