-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathhandler.go
149 lines (130 loc) · 3.51 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package main
import (
"bytes"
"encoding/json"
"log"
"regexp"
"strconv"
"strings"
"text/template"
sprig "github.com/Masterminds/sprig/v3"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/prometheus/client_golang/prometheus"
)
type TemplateData struct {
Payload interface{}
Matches []string
}
func NewMessageHandler(metrics []*Metric, namespace string) (mqtt.MessageHandler, error) {
var err error
names := make([]string, len(metrics))
regexps := make([]*regexp.Regexp, len(metrics))
valueTemplates := make([]*template.Template, len(metrics))
labelNames := make([][]string, len(metrics))
labelTemplates := make([][]*template.Template, len(metrics))
collectors := make([]*prometheus.GaugeVec, len(metrics))
for i, metric := range metrics {
names[i] = metric.Name
// compile match regexp
if metric.Match != "" {
regexps[i], err = regexp.Compile(metric.Match)
if err != nil {
return nil, err
}
}
// parse value template
if metric.Value != "" {
valueTemplates[i], err = template.New("value").Funcs(sprig.TxtFuncMap()).Parse(metric.Value)
if err != nil {
return nil, err
}
}
// parse label templates
labelNames[i] = make([]string, 0, len(metric.Labels))
labelTemplates[i] = make([]*template.Template, 0, len(metric.Labels))
for label, tplString := range metric.Labels {
labelNames[i] = append(labelNames[i], label)
tpl, err := template.New("label").Funcs(sprig.TxtFuncMap()).Parse(tplString)
if err != nil {
return nil, err
}
labelTemplates[i] = append(labelTemplates[i], tpl)
}
// create prometheus collector
collectors[i] = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: names[i],
}, labelNames[i])
err = prometheus.Register(collectors[i])
if err != nil {
return nil, err
}
}
// return the handler function for mqtt
return func(client mqtt.Client, msg mqtt.Message) {
var payload interface{}
var payloadIsFloat bool
// try to parse payload as float64 or JSON, else use as string
stringVal := string(msg.Payload())
floatVal, err := strconv.ParseFloat(stringVal, 64)
if err == nil {
payload = floatVal
payloadIsFloat = true
} else {
mapVal := make(map[string]interface{})
err = json.Unmarshal(msg.Payload(), &mapVal)
if err == nil {
payload = mapVal
} else {
payload = stringVal
}
}
// repeat for every configured metric
for i := range names {
// check if topic matches filter
var matches []string
if regexps[i] != nil {
matches = regexps[i].FindStringSubmatch(msg.Topic())
if matches == nil {
continue
}
}
// provide .Payload and .Matches to templates
data := &TemplateData{
Payload: payload,
Matches: matches,
}
// render template for value, parse as float64
var buf bytes.Buffer
if valueTemplates[i] != nil {
err = valueTemplates[i].Execute(&buf, data)
if err != nil {
log.Println(err)
continue
}
floatVal, err = strconv.ParseFloat(strings.TrimSpace(buf.String()), 64)
if err != nil {
continue
}
} else {
if !payloadIsFloat {
// payload is not numeric and no value template was given: skip
continue
}
}
// render template(s) for labels
labels := make(prometheus.Labels)
for j, tpl := range labelTemplates[i] {
buf.Reset()
err = tpl.Execute(&buf, data)
if err != nil {
log.Println(err)
continue
}
labels[labelNames[i][j]] = buf.String()
}
// update collector
collectors[i].With(labels).Set(floatVal)
}
}, nil
}