Skip to content

Commit 3de9eb8

Browse files
ronnocolMathieu Lecarme
authored and
Mathieu Lecarme
committed
Add Splunk MultiMetric support (influxdata#6640)
1 parent 833ac73 commit 3de9eb8

File tree

5 files changed

+250
-66
lines changed

5 files changed

+250
-66
lines changed

internal/config/config.go

+13
Original file line numberDiff line numberDiff line change
@@ -1952,6 +1952,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
19521952
}
19531953
}
19541954

1955+
if node, ok := tbl.Fields["splunkmetric_multimetric"]; ok {
1956+
if kv, ok := node.(*ast.KeyValue); ok {
1957+
if b, ok := kv.Value.(*ast.Boolean); ok {
1958+
var err error
1959+
c.SplunkmetricMultiMetric, err = b.Boolean()
1960+
if err != nil {
1961+
return nil, err
1962+
}
1963+
}
1964+
}
1965+
}
1966+
19551967
if node, ok := tbl.Fields["wavefront_source_override"]; ok {
19561968
if kv, ok := node.(*ast.KeyValue); ok {
19571969
if ary, ok := kv.Value.(*ast.Array); ok {
@@ -1985,6 +1997,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
19851997
delete(tbl.Fields, "template")
19861998
delete(tbl.Fields, "json_timestamp_units")
19871999
delete(tbl.Fields, "splunkmetric_hec_routing")
2000+
delete(tbl.Fields, "splunkmetric_multimetric")
19882001
delete(tbl.Fields, "wavefront_source_override")
19892002
delete(tbl.Fields, "wavefront_use_strict")
19902003
return serializers.NewSerializer(c)

plugins/serializers/registry.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ type Config struct {
7373
// Include HEC routing fields for splunkmetric output
7474
HecRouting bool
7575

76+
// Enable Splunk MultiMetric output (Splunk 8.0+)
77+
SplunkmetricMultiMetric bool
78+
7679
// Point tags to use as the source name for Wavefront (if none found, host will be used).
7780
WavefrontSourceOverride []string
7881

@@ -93,7 +96,7 @@ func NewSerializer(config *Config) (Serializer, error) {
9396
case "json":
9497
serializer, err = NewJsonSerializer(config.TimestampUnits)
9598
case "splunkmetric":
96-
serializer, err = NewSplunkmetricSerializer(config.HecRouting)
99+
serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric)
97100
case "nowmetric":
98101
serializer, err = NewNowSerializer()
99102
case "carbon2":
@@ -118,8 +121,8 @@ func NewCarbon2Serializer() (Serializer, error) {
118121
return carbon2.NewSerializer()
119122
}
120123

121-
func NewSplunkmetricSerializer(splunkmetric_hec_routing bool) (Serializer, error) {
122-
return splunkmetric.NewSerializer(splunkmetric_hec_routing)
124+
func NewSplunkmetricSerializer(splunkmetric_hec_routing bool, splunkmetric_multimetric bool) (Serializer, error) {
125+
return splunkmetric.NewSerializer(splunkmetric_hec_routing, splunkmetric_multimetric)
123126
}
124127

125128
func NewNowSerializer() (Serializer, error) {

plugins/serializers/splunkmetric/README.md

+33-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,36 @@ In the above snippet, the following keys are dimensions:
2727
* dc
2828
* user
2929

30+
## Using Multimetric output
31+
32+
Starting with Splunk Enterprise and Splunk Cloud 8.0, you can now send multiple metric values in one payload. This means, for example, that
33+
you can send all of your CPU stats in one JSON struct, an example event looks like:
34+
35+
```javascript
36+
{
37+
"time": 1572469920,
38+
"event": "metric",
39+
"host": "mono.local",
40+
"fields": {
41+
"_config_hecRouting": false,
42+
"_config_multiMetric": true,
43+
"class": "osx",
44+
"cpu": "cpu0",
45+
"metric_name:telegraf.cpu.usage_guest": 0,
46+
"metric_name:telegraf.cpu.usage_guest_nice": 0,
47+
"metric_name:telegraf.cpu.usage_idle": 65.1,
48+
"metric_name:telegraf.cpu.usage_iowait": 0,
49+
"metric_name:telegraf.cpu.usage_irq": 0,
50+
"metric_name:telegraf.cpu.usage_nice": 0,
51+
"metric_name:telegraf.cpu.usage_softirq": 0,
52+
"metric_name:telegraf.cpu.usage_steal": 0,
53+
"metric_name:telegraf.cpu.usage_system": 10.2,
54+
"metric_name:telegraf.cpu.usage_user": 24.7,
55+
}
56+
}
57+
```
58+
In order to enable this mode, there's a new option `splunkmetric_multimetric` that you set in the appropriate output module you plan on using.
59+
3060
## Using with the HTTP output
3161

3262
To send this data to a Splunk HEC, you can use the HTTP output, there are some custom headers that you need to add
@@ -61,6 +91,7 @@ to manage the HEC authorization, here's a sample config for an HTTP output:
6191
data_format = "splunkmetric"
6292
## Provides time, index, source overrides for the HEC
6393
splunkmetric_hec_routing = true
94+
# splunkmentric_multimetric = true
6495

6596
## Additional HTTP headers
6697
[outputs.http.headers]
@@ -118,7 +149,6 @@ disabled = false
118149
INDEXED_EXTRACTIONS = json
119150
KV_MODE = none
120151
TIMESTAMP_FIELDS = time
121-
TIME_FORMAT = %s.%3N
122152
```
123153

124154
An example configuration of a file based output is:
@@ -134,5 +164,6 @@ An example configuration of a file based output is:
134164
## more about them here:
135165
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
136166
data_format = "splunkmetric"
137-
hec_routing = false
167+
splunkmetric_hec_routing = false
168+
splunkmetric_multimetric = true
138169
```

plugins/serializers/splunkmetric/splunkmetric.go

+136-43
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,33 @@ import (
99
)
1010

1111
type serializer struct {
12-
HecRouting bool
12+
HecRouting bool
13+
SplunkmetricMultiMetric bool
1314
}
1415

15-
func NewSerializer(splunkmetric_hec_routing bool) (*serializer, error) {
16+
type CommonTags struct {
17+
Time float64
18+
Host string
19+
Index string
20+
Source string
21+
Fields map[string]interface{}
22+
}
23+
24+
type HECTimeSeries struct {
25+
Time float64 `json:"time"`
26+
Event string `json:"event"`
27+
Host string `json:"host,omitempty"`
28+
Index string `json:"index,omitempty"`
29+
Source string `json:"source,omitempty"`
30+
Fields map[string]interface{} `json:"fields"`
31+
}
32+
33+
// NewSerializer Setup our new serializer
34+
func NewSerializer(splunkmetric_hec_routing bool, splunkmetric_multimetric bool) (*serializer, error) {
35+
/* Define output params */
1636
s := &serializer{
17-
HecRouting: splunkmetric_hec_routing,
37+
HecRouting: splunkmetric_hec_routing,
38+
SplunkmetricMultiMetric: splunkmetric_multimetric,
1839
}
1940
return s, nil
2041
}
@@ -45,26 +66,61 @@ func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
4566
return serialized, nil
4667
}
4768

48-
func (s *serializer) createObject(metric telegraf.Metric) (metricGroup []byte, err error) {
69+
func (s *serializer) createMulti(metric telegraf.Metric, dataGroup HECTimeSeries, commonTags CommonTags) (metricGroup []byte, err error) {
70+
/* When splunkmetric_multimetric is true, then we can write out multiple name=value pairs as part of the same
71+
** event payload. This only works when the time, host, and dimensions are the same for every name=value pair
72+
** in the timeseries data.
73+
**
74+
** The format for multimetric data is 'metric_name:nameOfMetric = valueOfMetric'
75+
*/
76+
var metricJSON []byte
77+
78+
// Set the event data from the commonTags above.
79+
dataGroup.Event = "metric"
80+
dataGroup.Time = commonTags.Time
81+
dataGroup.Host = commonTags.Host
82+
dataGroup.Index = commonTags.Index
83+
dataGroup.Source = commonTags.Source
84+
dataGroup.Fields = commonTags.Fields
85+
86+
// Stuff the metrid data into the structure.
87+
for _, field := range metric.FieldList() {
88+
value, valid := verifyValue(field.Value)
4989

50-
/* Splunk supports one metric json object, and does _not_ support an array of JSON objects.
51-
** Splunk has the following required names for the metric store:
52-
** metric_name: The name of the metric
53-
** _value: The value for the metric
54-
** time: The timestamp for the metric
55-
** All other index fields become dimensions.
56-
*/
57-
type HECTimeSeries struct {
58-
Time float64 `json:"time"`
59-
Event string `json:"event"`
60-
Host string `json:"host,omitempty"`
61-
Index string `json:"index,omitempty"`
62-
Source string `json:"source,omitempty"`
63-
Fields map[string]interface{} `json:"fields"`
90+
if !valid {
91+
log.Printf("D! Can not parse value: %v for key: %v", field.Value, field.Key)
92+
continue
93+
}
94+
95+
dataGroup.Fields["metric_name:"+metric.Name()+"."+field.Key] = value
6496
}
6597

66-
dataGroup := HECTimeSeries{}
67-
var metricJson []byte
98+
// Manage the rest of the event details based upon HEC routing rules
99+
switch s.HecRouting {
100+
case true:
101+
// Output the data as a fields array and host,index,time,source overrides for the HEC.
102+
metricJSON, err = json.Marshal(dataGroup)
103+
default:
104+
// Just output the data and the time, useful for file based outuputs
105+
dataGroup.Fields["time"] = dataGroup.Time
106+
metricJSON, err = json.Marshal(dataGroup.Fields)
107+
}
108+
if err != nil {
109+
return nil, err
110+
}
111+
// Let the JSON fall through to the return below
112+
metricGroup = metricJSON
113+
114+
return metricGroup, nil
115+
}
116+
117+
func (s *serializer) createSingle(metric telegraf.Metric, dataGroup HECTimeSeries, commonTags CommonTags) (metricGroup []byte, err error) {
118+
/* The default mode is to generate one JSON entitiy per metric (required for pre-8.0 Splunks)
119+
**
120+
** The format for single metric is 'nameOfMetric = valueOfMetric'
121+
*/
122+
123+
var metricJSON []byte
68124

69125
for _, field := range metric.FieldList() {
70126

@@ -75,39 +131,30 @@ func (s *serializer) createObject(metric telegraf.Metric) (metricGroup []byte, e
75131
continue
76132
}
77133

78-
obj := map[string]interface{}{}
79-
obj["metric_name"] = metric.Name() + "." + field.Key
80-
obj["_value"] = value
81-
82134
dataGroup.Event = "metric"
83-
// Convert ns to float seconds since epoch.
84-
dataGroup.Time = float64(metric.Time().UnixNano()) / float64(1000000000)
85-
dataGroup.Fields = obj
86-
87-
// Break tags out into key(n)=value(t) pairs
88-
for n, t := range metric.Tags() {
89-
if n == "host" {
90-
dataGroup.Host = t
91-
} else if n == "index" {
92-
dataGroup.Index = t
93-
} else if n == "source" {
94-
dataGroup.Source = t
95-
} else {
96-
dataGroup.Fields[n] = t
97-
}
98-
}
135+
136+
dataGroup.Time = commonTags.Time
137+
138+
// Apply the common tags from above to every record.
139+
dataGroup.Host = commonTags.Host
140+
dataGroup.Index = commonTags.Index
141+
dataGroup.Source = commonTags.Source
142+
dataGroup.Fields = commonTags.Fields
143+
144+
dataGroup.Fields["metric_name"] = metric.Name() + "." + field.Key
145+
dataGroup.Fields["_value"] = value
99146

100147
switch s.HecRouting {
101148
case true:
102149
// Output the data as a fields array and host,index,time,source overrides for the HEC.
103-
metricJson, err = json.Marshal(dataGroup)
150+
metricJSON, err = json.Marshal(dataGroup)
104151
default:
105152
// Just output the data and the time, useful for file based outuputs
106153
dataGroup.Fields["time"] = dataGroup.Time
107-
metricJson, err = json.Marshal(dataGroup.Fields)
154+
metricJSON, err = json.Marshal(dataGroup.Fields)
108155
}
109156

110-
metricGroup = append(metricGroup, metricJson...)
157+
metricGroup = append(metricGroup, metricJSON...)
111158

112159
if err != nil {
113160
return nil, err
@@ -117,6 +164,52 @@ func (s *serializer) createObject(metric telegraf.Metric) (metricGroup []byte, e
117164
return metricGroup, nil
118165
}
119166

167+
func (s *serializer) createObject(metric telegraf.Metric) (metricGroup []byte, err error) {
168+
169+
/* Splunk supports one metric json object, and does _not_ support an array of JSON objects.
170+
** Splunk has the following required names for the metric store:
171+
** metric_name: The name of the metric
172+
** _value: The value for the metric
173+
** time: The timestamp for the metric
174+
** All other index fields become dimensions.
175+
*/
176+
177+
dataGroup := HECTimeSeries{}
178+
179+
// The tags are common to all events in this timeseries
180+
commonTags := CommonTags{}
181+
182+
commonObj := map[string]interface{}{}
183+
184+
commonObj["config:hecRouting"] = s.HecRouting
185+
commonObj["config:multiMetric"] = s.SplunkmetricMultiMetric
186+
187+
commonTags.Fields = commonObj
188+
189+
// Break tags out into key(n)=value(t) pairs
190+
for n, t := range metric.Tags() {
191+
if n == "host" {
192+
commonTags.Host = t
193+
} else if n == "index" {
194+
commonTags.Index = t
195+
} else if n == "source" {
196+
commonTags.Source = t
197+
} else {
198+
commonTags.Fields[n] = t
199+
}
200+
}
201+
commonTags.Time = float64(metric.Time().UnixNano()) / float64(1000000000)
202+
switch s.SplunkmetricMultiMetric {
203+
case true:
204+
metricGroup, _ = s.createMulti(metric, dataGroup, commonTags)
205+
default:
206+
metricGroup, _ = s.createSingle(metric, dataGroup, commonTags)
207+
}
208+
209+
// Return the metric group regardless of if it's multimetric or single metric.
210+
return metricGroup, nil
211+
}
212+
120213
func verifyValue(v interface{}) (value interface{}, valid bool) {
121214
switch v.(type) {
122215
case string:

0 commit comments

Comments
 (0)