Skip to content

Commit 6ec98af

Browse files
piotr1212otherpirate
authored andcommitted
Add valuecounter aggregator plugin (influxdata#3523)
1 parent e4a3876 commit 6ec98af

File tree

5 files changed

+309
-0
lines changed

5 files changed

+309
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ formats may be used with input plugins supporting the `data_format` option:
282282
* [basicstats](./plugins/aggregators/basicstats)
283283
* [minmax](./plugins/aggregators/minmax)
284284
* [histogram](./plugins/aggregators/histogram)
285+
* [valuecounter](./plugins/aggregators/valuecounter)
285286

286287
## Output Plugins
287288

plugins/aggregators/all/all.go

+1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ import (
44
_ "github.com/influxdata/telegraf/plugins/aggregators/basicstats"
55
_ "github.com/influxdata/telegraf/plugins/aggregators/histogram"
66
_ "github.com/influxdata/telegraf/plugins/aggregators/minmax"
7+
_ "github.com/influxdata/telegraf/plugins/aggregators/valuecounter"
78
)
+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# ValueCounter Aggregator Plugin
2+
3+
The valuecounter plugin counts the occurrence of values in fields and emits the
4+
counter once every 'period' seconds.
5+
6+
A use case for the valuecounter plugin is when you are processing a HTTP access
7+
log (with the logparser input) and want to count the HTTP status codes.
8+
9+
The fields which will be counted must be configured with the `fields`
10+
configuration directive. When no `fields` is provided the plugin will not count
11+
any fields. The results are emitted in fields in the format:
12+
`originalfieldname_fieldvalue = count`.
13+
14+
Valuecounter only works on fields of the type int, bool or string. Float fields
15+
are being dropped to prevent the creating of too many fields.
16+
17+
### Configuration:
18+
19+
```toml
20+
[[aggregators.valuecounter]]
21+
## General Aggregator Arguments:
22+
## The period on which to flush & clear the aggregator.
23+
period = "30s"
24+
## If true, the original metric will be dropped by the
25+
## aggregator and will not get sent to the output plugins.
26+
drop_original = false
27+
## The fields for which the values will be counted
28+
fields = ["status"]
29+
```
30+
31+
### Measurements & Fields:
32+
33+
- measurement1
34+
- field_value1
35+
- field_value2
36+
37+
### Tags:
38+
39+
No tags are applied by this aggregator.
40+
41+
### Example Output:
42+
43+
Example for parsing a HTTP access log.
44+
45+
telegraf.conf:
46+
```
47+
[[inputs.logparser]]
48+
files = ["/tmp/tst.log"]
49+
[inputs.logparser.grok]
50+
patterns = ['%{DATA:url:tag} %{NUMBER:response:string}']
51+
measurement = "access"
52+
53+
[[aggregators.valuecounter]]
54+
namepass = ["access"]
55+
fields = ["response"]
56+
```
57+
58+
/tmp/tst.log
59+
```
60+
/some/path 200
61+
/some/path 401
62+
/some/path 200
63+
```
64+
65+
```
66+
$ telegraf --config telegraf.conf --quiet
67+
68+
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="200" 1511948755991487011
69+
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="401" 1511948755991522282
70+
access,url=/some/path,path=/tmp/tst.log,host=localhost.localdomain response="200" 1511948755991531697
71+
72+
access,path=/tmp/tst.log,host=localhost.localdomain,url=/some/path response_200=2i,response_401=1i 1511948761000000000
73+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package valuecounter
2+
3+
import (
4+
"fmt"
5+
"log"
6+
7+
"github.com/influxdata/telegraf"
8+
"github.com/influxdata/telegraf/plugins/aggregators"
9+
)
10+
11+
type aggregate struct {
12+
name string
13+
tags map[string]string
14+
fieldCount map[string]int
15+
}
16+
17+
// ValueCounter an aggregation plugin
18+
type ValueCounter struct {
19+
cache map[uint64]aggregate
20+
Fields []string
21+
}
22+
23+
// NewValueCounter create a new aggregation plugin which counts the occurances
24+
// of fields and emits the count.
25+
func NewValueCounter() telegraf.Aggregator {
26+
vc := &ValueCounter{}
27+
vc.Reset()
28+
return vc
29+
}
30+
31+
var sampleConfig = `
32+
## General Aggregator Arguments:
33+
## The period on which to flush & clear the aggregator.
34+
period = "30s"
35+
## If true, the original metric will be dropped by the
36+
## aggregator and will not get sent to the output plugins.
37+
drop_original = false
38+
## The fields for which the values will be counted
39+
fields = []
40+
`
41+
42+
// SampleConfig generates a sample config for the ValueCounter plugin
43+
func (vc *ValueCounter) SampleConfig() string {
44+
return sampleConfig
45+
}
46+
47+
// Description returns the description of the ValueCounter plugin
48+
func (vc *ValueCounter) Description() string {
49+
return "Count the occurance of values in fields."
50+
}
51+
52+
// Add is run on every metric which passes the plugin
53+
func (vc *ValueCounter) Add(in telegraf.Metric) {
54+
id := in.HashID()
55+
56+
// Check if the cache already has an entry for this metric, if not create it
57+
if _, ok := vc.cache[id]; !ok {
58+
a := aggregate{
59+
name: in.Name(),
60+
tags: in.Tags(),
61+
fieldCount: make(map[string]int),
62+
}
63+
vc.cache[id] = a
64+
}
65+
66+
// Check if this metric has fields which we need to count, if so increment
67+
// the count.
68+
for fk, fv := range in.Fields() {
69+
for _, cf := range vc.Fields {
70+
if fk == cf {
71+
// Do not process float types to prevent memory from blowing up
72+
switch fv.(type) {
73+
default:
74+
log.Printf("I! Valuecounter: Unsupported field type. " +
75+
"Must be an int, string or bool. Ignoring.")
76+
continue
77+
case uint64, int64, string, bool:
78+
}
79+
fn := fmt.Sprintf("%v_%v", fk, fv)
80+
vc.cache[id].fieldCount[fn]++
81+
}
82+
}
83+
}
84+
}
85+
86+
// Push emits the counters
87+
func (vc *ValueCounter) Push(acc telegraf.Accumulator) {
88+
for _, agg := range vc.cache {
89+
fields := map[string]interface{}{}
90+
91+
for field, count := range agg.fieldCount {
92+
fields[field] = count
93+
}
94+
95+
acc.AddFields(agg.name, fields, agg.tags)
96+
}
97+
}
98+
99+
// Reset the cache, executed after each push
100+
func (vc *ValueCounter) Reset() {
101+
vc.cache = make(map[uint64]aggregate)
102+
}
103+
104+
func init() {
105+
aggregators.Add("valuecounter", func() telegraf.Aggregator {
106+
return NewValueCounter()
107+
})
108+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package valuecounter
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/influxdata/telegraf"
8+
"github.com/influxdata/telegraf/metric"
9+
"github.com/influxdata/telegraf/testutil"
10+
)
11+
12+
// Create a valuecounter with config
13+
func NewTestValueCounter(fields []string) telegraf.Aggregator {
14+
vc := &ValueCounter{
15+
Fields: fields,
16+
}
17+
vc.Reset()
18+
19+
return vc
20+
}
21+
22+
var m1, _ = metric.New("m1",
23+
map[string]string{"foo": "bar"},
24+
map[string]interface{}{
25+
"status": 200,
26+
"somefield": 20.1,
27+
"foobar": "bar",
28+
},
29+
time.Now(),
30+
)
31+
32+
var m2, _ = metric.New("m1",
33+
map[string]string{"foo": "bar"},
34+
map[string]interface{}{
35+
"status": "OK",
36+
"ignoreme": "string",
37+
"andme": true,
38+
"boolfield": false,
39+
},
40+
time.Now(),
41+
)
42+
43+
func BenchmarkApply(b *testing.B) {
44+
vc := NewTestValueCounter([]string{"status"})
45+
46+
for n := 0; n < b.N; n++ {
47+
vc.Add(m1)
48+
vc.Add(m2)
49+
}
50+
}
51+
52+
// Test basic functionality
53+
func TestBasic(t *testing.T) {
54+
vc := NewTestValueCounter([]string{"status"})
55+
acc := testutil.Accumulator{}
56+
57+
vc.Add(m1)
58+
vc.Add(m2)
59+
vc.Add(m1)
60+
vc.Push(&acc)
61+
62+
expectedFields := map[string]interface{}{
63+
"status_200": 2,
64+
"status_OK": 1,
65+
}
66+
expectedTags := map[string]string{
67+
"foo": "bar",
68+
}
69+
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
70+
}
71+
72+
// Test with multiple fields to count
73+
func TestMultipleFields(t *testing.T) {
74+
vc := NewTestValueCounter([]string{"status", "somefield", "boolfield"})
75+
acc := testutil.Accumulator{}
76+
77+
vc.Add(m1)
78+
vc.Add(m2)
79+
vc.Add(m2)
80+
vc.Add(m1)
81+
vc.Push(&acc)
82+
83+
expectedFields := map[string]interface{}{
84+
"status_200": 2,
85+
"status_OK": 2,
86+
"boolfield_false": 2,
87+
}
88+
expectedTags := map[string]string{
89+
"foo": "bar",
90+
}
91+
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
92+
}
93+
94+
// Test with a reset between two runs
95+
func TestWithReset(t *testing.T) {
96+
vc := NewTestValueCounter([]string{"status"})
97+
acc := testutil.Accumulator{}
98+
99+
vc.Add(m1)
100+
vc.Add(m1)
101+
vc.Add(m2)
102+
vc.Push(&acc)
103+
104+
expectedFields := map[string]interface{}{
105+
"status_200": 2,
106+
"status_OK": 1,
107+
}
108+
expectedTags := map[string]string{
109+
"foo": "bar",
110+
}
111+
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
112+
113+
acc.ClearMetrics()
114+
vc.Reset()
115+
116+
vc.Add(m2)
117+
vc.Add(m2)
118+
vc.Add(m1)
119+
vc.Push(&acc)
120+
121+
expectedFields = map[string]interface{}{
122+
"status_200": 1,
123+
"status_OK": 2,
124+
}
125+
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
126+
}

0 commit comments

Comments
 (0)