Skip to content

Commit 0ca0e21

Browse files
DanKansdanielnelson
DanKans
authored andcommitted
Add fluentd input plugin (#2661)
1 parent 4138e8a commit 0ca0e21

File tree

4 files changed

+407
-0
lines changed

4 files changed

+407
-0
lines changed

plugins/inputs/all/all.go

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
_ "github.com/influxdata/telegraf/plugins/inputs/exec"
2424
_ "github.com/influxdata/telegraf/plugins/inputs/fail2ban"
2525
_ "github.com/influxdata/telegraf/plugins/inputs/filestat"
26+
_ "github.com/influxdata/telegraf/plugins/inputs/fluentd"
2627
_ "github.com/influxdata/telegraf/plugins/inputs/graylog"
2728
_ "github.com/influxdata/telegraf/plugins/inputs/haproxy"
2829
_ "github.com/influxdata/telegraf/plugins/inputs/hddtemp"

plugins/inputs/fluentd/README.md

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Fluentd Input Plugin
2+
3+
The fluentd plugin gathers metrics from plugin endpoint provided by [in_monitor plugin](http://docs.fluentd.org/v0.12/articles/monitoring).
4+
This plugin understands data provided by /api/plugin.json resource (/api/config.json is not covered).
5+
6+
You might need to adjust your fluentd configuration, in order to reduce series cardinality in case whene your fluentd restarts frequently. Every time when fluentd starts, `plugin_id` value is given a new random value.
7+
According to [fluentd documentation](http://docs.fluentd.org/v0.12/articles/config-file), you are able to add `@id` parameter for each plugin to avoid this behaviour and define custom `plugin_id`.
8+
9+
example configuratio with `@id` parameter for http plugin:
10+
```
11+
<source>
12+
@type http
13+
@id http
14+
port 8888
15+
</source>
16+
```
17+
18+
### Configuration:
19+
20+
```toml
21+
# Read metrics exposed by fluentd in_monitor plugin
22+
[[inputs.fluentd]]
23+
## This plugin reads information exposed by fluentd (using /api/plugins.json endpoint).
24+
##
25+
## Endpoint:
26+
## - only one URI is allowed
27+
## - https is not supported
28+
endpoint = "http://localhost:24220/api/plugins.json"
29+
30+
## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent)
31+
exclude = [
32+
"monitor_agent",
33+
"dummy",
34+
]
35+
```
36+
37+
### Measurements & Fields:
38+
39+
Fields may vary depends on type of the plugin
40+
41+
- fluentd
42+
- retry_count (float, unit)
43+
- buffer_queue_length (float, unit)
44+
- buffer_total_queued_size (float, unit)
45+
46+
### Tags:
47+
48+
- All measurements have the following tags:
49+
- plugin_id (unique plugin id)
50+
- plugin_type (type of the plugin e.g. s3)
51+
- plugin_category (plugin category e.g. output)
52+
53+
### Example Output:
54+
55+
```
56+
$ telegraf --config fluentd.conf --input-filter fluentd --test
57+
* Plugin: inputs.fluentd, Collection 1
58+
> fluentd,host=T440s,plugin_id=object:9f748c,plugin_category=input,plugin_type=dummy buffer_total_queued_size=0,buffer_queue_length=0,retry_count=0 1492006105000000000
59+
> fluentd,plugin_category=input,plugin_type=dummy,host=T440s,plugin_id=object:8da98c buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000
60+
> fluentd,plugin_id=object:820190,plugin_category=input,plugin_type=monitor_agent,host=T440s retry_count=0,buffer_total_queued_size=0,buffer_queue_length=0 1492006105000000000
61+
> fluentd,plugin_id=object:c5e054,plugin_category=output,plugin_type=stdout,host=T440s buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000
62+
> fluentd,plugin_type=s3,host=T440s,plugin_id=object:bd7a90,plugin_category=output buffer_queue_length=0,retry_count=0,buffer_total_queued_size=0 1492006105000000000
63+
64+
```

plugins/inputs/fluentd/fluentd.go

+173
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package fluentd
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io/ioutil"
7+
"net/http"
8+
"net/url"
9+
"time"
10+
11+
"github.com/influxdata/telegraf"
12+
"github.com/influxdata/telegraf/plugins/inputs"
13+
)
14+
15+
const (
16+
measurement = "fluentd"
17+
description = "Read metrics exposed by fluentd in_monitor plugin"
18+
sampleConfig = `
19+
## This plugin reads information exposed by fluentd (using /api/plugins.json endpoint).
20+
##
21+
## Endpoint:
22+
## - only one URI is allowed
23+
## - https is not supported
24+
endpoint = "http://localhost:24220/api/plugins.json"
25+
26+
## Define which plugins have to be excluded (based on "type" field - e.g. monitor_agent)
27+
exclude = [
28+
"monitor_agent",
29+
"dummy",
30+
]
31+
`
32+
)
33+
34+
// Fluentd - plugin main structure
35+
type Fluentd struct {
36+
Endpoint string
37+
Exclude []string
38+
client *http.Client
39+
}
40+
41+
type endpointInfo struct {
42+
Payload []pluginData `json:"plugins"`
43+
}
44+
45+
type pluginData struct {
46+
PluginID string `json:"plugin_id"`
47+
PluginType string `json:"type"`
48+
PluginCategory string `json:"plugin_category"`
49+
RetryCount *float64 `json:"retry_count"`
50+
BufferQueueLength *float64 `json:"buffer_queue_length"`
51+
BufferTotalQueuedSize *float64 `json:"buffer_total_queued_size"`
52+
}
53+
54+
// parse JSON from fluentd Endpoint
55+
// Parameters:
56+
// data: unprocessed json recivied from endpoint
57+
//
58+
// Returns:
59+
// pluginData: slice that contains parsed plugins
60+
// error: error that may have occurred
61+
func parse(data []byte) (datapointArray []pluginData, err error) {
62+
var endpointData endpointInfo
63+
64+
if err = json.Unmarshal(data, &endpointData); err != nil {
65+
err = fmt.Errorf("Processing JSON structure")
66+
return
67+
}
68+
69+
for _, point := range endpointData.Payload {
70+
datapointArray = append(datapointArray, point)
71+
}
72+
73+
return
74+
}
75+
76+
// Description - display description
77+
func (h *Fluentd) Description() string { return description }
78+
79+
// SampleConfig - generate configuretion
80+
func (h *Fluentd) SampleConfig() string { return sampleConfig }
81+
82+
// Gather - Main code responsible for gathering, processing and creating metrics
83+
func (h *Fluentd) Gather(acc telegraf.Accumulator) error {
84+
85+
_, err := url.Parse(h.Endpoint)
86+
if err != nil {
87+
return fmt.Errorf("Invalid URL \"%s\"", h.Endpoint)
88+
}
89+
90+
if h.client == nil {
91+
92+
tr := &http.Transport{
93+
ResponseHeaderTimeout: time.Duration(3 * time.Second),
94+
}
95+
96+
client := &http.Client{
97+
Transport: tr,
98+
Timeout: time.Duration(4 * time.Second),
99+
}
100+
101+
h.client = client
102+
}
103+
104+
resp, err := h.client.Get(h.Endpoint)
105+
106+
if err != nil {
107+
return fmt.Errorf("Unable to perform HTTP client GET on \"%s\": %s", h.Endpoint, err)
108+
}
109+
110+
defer resp.Body.Close()
111+
112+
body, err := ioutil.ReadAll(resp.Body)
113+
114+
if err != nil {
115+
return fmt.Errorf("Unable to read the HTTP body \"%s\": %s", string(body), err)
116+
}
117+
118+
if resp.StatusCode != http.StatusOK {
119+
return fmt.Errorf("http status ok not met")
120+
}
121+
122+
dataPoints, err := parse(body)
123+
124+
if err != nil {
125+
return fmt.Errorf("Problem with parsing")
126+
}
127+
128+
// Go through all plugins one by one
129+
for _, p := range dataPoints {
130+
131+
skip := false
132+
133+
// Check if this specific type was excluded in configuration
134+
for _, exclude := range h.Exclude {
135+
if exclude == p.PluginType {
136+
skip = true
137+
}
138+
}
139+
140+
// If not, create new metric and add it to Accumulator
141+
if !skip {
142+
tmpFields := make(map[string]interface{})
143+
144+
tmpTags := map[string]string{
145+
"plugin_id": p.PluginID,
146+
"plugin_category": p.PluginCategory,
147+
"plugin_type": p.PluginType,
148+
}
149+
150+
if p.BufferQueueLength != nil {
151+
tmpFields["buffer_queue_length"] = p.BufferQueueLength
152+
153+
}
154+
if p.RetryCount != nil {
155+
tmpFields["retry_count"] = p.RetryCount
156+
}
157+
158+
if p.BufferTotalQueuedSize != nil {
159+
tmpFields["buffer_total_queued_size"] = p.BufferTotalQueuedSize
160+
}
161+
162+
if !((p.BufferQueueLength == nil) && (p.RetryCount == nil) && (p.BufferTotalQueuedSize == nil)) {
163+
acc.AddFields(measurement, tmpFields, tmpTags)
164+
}
165+
}
166+
}
167+
168+
return nil
169+
}
170+
171+
func init() {
172+
inputs.Add("fluentd", func() telegraf.Input { return &Fluentd{} })
173+
}

0 commit comments

Comments
 (0)