Skip to content

Commit 4f6ee8c

Browse files
committed
Allow exec plugin to parse line-protocol
closes #613
1 parent 338341a commit 4f6ee8c

File tree

4 files changed

+162
-34
lines changed

4 files changed

+162
-34
lines changed

metric.go

+3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package telegraf
22

33
import (
4+
"bytes"
45
"time"
56

67
"github.com/influxdata/influxdb/client/v2"
@@ -68,6 +69,8 @@ func NewMetric(
6869
// a non-nil error will be returned in addition to the metrics that parsed
6970
// successfully.
7071
func ParseMetrics(buf []byte) ([]Metric, error) {
72+
// parse even if the buffer begins with a newline
73+
buf = bytes.TrimPrefix(buf, []byte("\n"))
7174
points, err := models.ParsePoints(buf)
7275
metrics := make([]Metric, len(points))
7376
for i, point := range points {

plugins/inputs/exec/README.md

+55-18
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,39 @@
1-
# Exec Plugin
1+
# Exec Input Plugin
22

3-
The exec plugin can execute arbitrary commands which output JSON. Then it flattens JSON and finds
4-
all numeric values, treating them as floats.
3+
The exec plugin can execute arbitrary commands which output JSON or
4+
InfluxDB [line-protocol](https://docs.influxdata.com/influxdb/v0.9/write_protocols/line/).
55

6-
For example, if you have a json-returning command called mycollector, you could
7-
setup the exec plugin with:
6+
If using JSON, only numeric values are parsed and turned into floats. Booleans
7+
and strings will be ignored.
8+
9+
### Configuration
810

911
```
12+
# Read flattened metrics from one or more commands that output JSON to stdout
1013
[[inputs.exec]]
11-
command = "/usr/bin/mycollector --output=json"
14+
# the command to run
15+
command = "/usr/bin/mycollector --foo=bar"
16+
17+
# Data format to consume. This can be "json" or "influx" (line-protocol)
18+
# NOTE json only reads numerical measurements, strings and booleans are ignored.
19+
data_format = "json"
20+
21+
# measurement name suffix (for separating different commands)
1222
name_suffix = "_mycollector"
13-
interval = "10s"
1423
```
1524

16-
The name suffix is appended to exec as "exec_name_suffix" to identify the input stream.
25+
Other options for modifying the measurement names are:
1726

18-
The interval is used to determine how often a particular command should be run. Each
19-
time the exec plugin runs, it will only run a particular command if it has been at least
20-
`interval` seconds since the exec plugin last ran the command.
27+
```
28+
name_override = "measurement_name"
29+
name_prefix = "prefix_"
30+
```
2131

32+
### Example 1
2233

23-
# Sample
34+
Let's say that we have the above configuration, and mycollector outputs the
35+
following JSON:
2436

25-
Let's say that we have a command with the name_suffix "_mycollector", which gives the following output:
2637
```json
2738
{
2839
"a": 0.5,
@@ -33,13 +44,39 @@ Let's say that we have a command with the name_suffix "_mycollector", which give
3344
}
3445
```
3546

36-
The collected metrics will be stored as field values under the same measurement "exec_mycollector":
47+
The collected metrics will be stored as fields under the measurement
48+
"exec_mycollector":
49+
3750
```
38-
exec_mycollector a=0.5,b_c=0.1,b_d=5 1452815002357578567
51+
exec_mycollector a=0.5,b_c=0.1,b_d=5 1452815002357578567
3952
```
4053

41-
Other options for modifying the measurement names are:
54+
### Example 2
55+
56+
Now let's say we have the following configuration:
57+
4258
```
43-
name_override = "newname"
44-
name_prefix = "prefix_"
59+
[[inputs.exec]]
60+
# the command to run
61+
command = "/usr/bin/line_protocol_collector"
62+
63+
# Data format to consume. This can be "json" or "influx" (line-protocol)
64+
# NOTE json only reads numerical measurements, strings and booleans are ignored.
65+
data_format = "influx"
4566
```
67+
68+
And line_protocol_collector outputs the following line protocol:
69+
70+
```
71+
cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
72+
cpu,cpu=cpu1,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
73+
cpu,cpu=cpu2,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
74+
cpu,cpu=cpu3,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
75+
cpu,cpu=cpu4,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
76+
cpu,cpu=cpu5,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
77+
cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
78+
```
79+
80+
You will get data in InfluxDB exactly as it is defined above,
81+
tags are cpu=cpuN, host=foo, and datacenter=us-east with fields usage_idle
82+
and usage_busy. They will receive a timestamp at collection time.

plugins/inputs/exec/exec.go

+31-16
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"fmt"
77
"os/exec"
8+
"time"
89

910
"github.com/gonuts/go-shellquote"
1011

@@ -14,18 +15,20 @@ import (
1415
)
1516

1617
const sampleConfig = `
17-
# NOTE This plugin only reads numerical measurements, strings and booleans
18-
# will be ignored.
19-
2018
# the command to run
2119
command = "/usr/bin/mycollector --foo=bar"
2220
21+
# Data format to consume. This can be "json" or "influx" (line-protocol)
22+
# NOTE json only reads numerical measurements, strings and booleans are ignored.
23+
data_format = "json"
24+
2325
# measurement name suffix (for separating different commands)
2426
name_suffix = "_mycollector"
2527
`
2628

2729
type Exec struct {
28-
Command string
30+
Command string
31+
DataFormat string
2932

3033
runner Runner
3134
}
@@ -71,20 +74,32 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
7174
return err
7275
}
7376

74-
var jsonOut interface{}
75-
err = json.Unmarshal(out, &jsonOut)
76-
if err != nil {
77-
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s",
78-
e.Command, err)
79-
}
80-
81-
f := internal.JSONFlattener{}
82-
err = f.FlattenJSON("", jsonOut)
83-
if err != nil {
77+
switch e.DataFormat {
78+
case "", "json":
79+
var jsonOut interface{}
80+
err = json.Unmarshal(out, &jsonOut)
81+
if err != nil {
82+
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s",
83+
e.Command, err)
84+
}
85+
86+
f := internal.JSONFlattener{}
87+
err = f.FlattenJSON("", jsonOut)
88+
if err != nil {
89+
return err
90+
}
91+
acc.AddFields("exec", f.Fields, nil)
92+
case "influx":
93+
now := time.Now()
94+
metrics, err := telegraf.ParseMetrics(out)
95+
for _, metric := range metrics {
96+
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now)
97+
}
8498
return err
99+
default:
100+
return fmt.Errorf("Unsupported data format: %s. Must be either json "+
101+
"or influx.", e.DataFormat)
85102
}
86-
87-
acc.AddFields("exec", f.Fields, nil)
88103
return nil
89104
}
90105

plugins/inputs/exec/exec_test.go

+73
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,18 @@ const malformedJson = `
3131
"status": "green",
3232
`
3333

34+
const lineProtocol = "cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1"
35+
36+
const lineProtocolMulti = `
37+
cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
38+
cpu,cpu=cpu1,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
39+
cpu,cpu=cpu2,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
40+
cpu,cpu=cpu3,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
41+
cpu,cpu=cpu4,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
42+
cpu,cpu=cpu5,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
43+
cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
44+
`
45+
3446
type runnerMock struct {
3547
out []byte
3648
err error
@@ -97,3 +109,64 @@ func TestCommandError(t *testing.T) {
97109
require.Error(t, err)
98110
assert.Equal(t, acc.NFields(), 0, "No new points should have been added")
99111
}
112+
113+
func TestLineProtocolParse(t *testing.T) {
114+
e := &Exec{
115+
runner: newRunnerMock([]byte(lineProtocol), nil),
116+
Command: "line-protocol",
117+
DataFormat: "influx",
118+
}
119+
120+
var acc testutil.Accumulator
121+
err := e.Gather(&acc)
122+
require.NoError(t, err)
123+
124+
fields := map[string]interface{}{
125+
"usage_idle": float64(99),
126+
"usage_busy": float64(1),
127+
}
128+
tags := map[string]string{
129+
"host": "foo",
130+
"datacenter": "us-east",
131+
}
132+
acc.AssertContainsTaggedFields(t, "cpu", fields, tags)
133+
}
134+
135+
func TestLineProtocolParseMultiple(t *testing.T) {
136+
e := &Exec{
137+
runner: newRunnerMock([]byte(lineProtocolMulti), nil),
138+
Command: "line-protocol",
139+
DataFormat: "influx",
140+
}
141+
142+
var acc testutil.Accumulator
143+
err := e.Gather(&acc)
144+
require.NoError(t, err)
145+
146+
fields := map[string]interface{}{
147+
"usage_idle": float64(99),
148+
"usage_busy": float64(1),
149+
}
150+
tags := map[string]string{
151+
"host": "foo",
152+
"datacenter": "us-east",
153+
}
154+
cpuTags := []string{"cpu0", "cpu1", "cpu2", "cpu3", "cpu4", "cpu5", "cpu6"}
155+
156+
for _, cpu := range cpuTags {
157+
tags["cpu"] = cpu
158+
acc.AssertContainsTaggedFields(t, "cpu", fields, tags)
159+
}
160+
}
161+
162+
func TestInvalidDataFormat(t *testing.T) {
163+
e := &Exec{
164+
runner: newRunnerMock([]byte(lineProtocol), nil),
165+
Command: "bad data format",
166+
DataFormat: "FooBar",
167+
}
168+
169+
var acc testutil.Accumulator
170+
err := e.Gather(&acc)
171+
require.Error(t, err)
172+
}

0 commit comments

Comments
 (0)