Skip to content

Commit a5bda29

Browse files
oplehtoMathieu Lecarme
authored and
Mathieu Lecarme
committed
Add option to reset buckets on flush to histogram aggregator (influxdata#5641)
1 parent 0bf6af9 commit a5bda29

File tree

3 files changed

+53
-12
lines changed

3 files changed

+53
-12
lines changed

plugins/aggregators/histogram/README.md

+7-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ Values added to a bucket are also added to the larger buckets in the
77
distribution. This creates a [cumulative histogram](https://en.wikipedia.org/wiki/Histogram#/media/File:Cumulative_vs_normal_histogram.svg).
88

99
Like other Telegraf aggregators, the metric is emitted every `period` seconds.
10-
Bucket counts however are not reset between periods and will be non-strictly
11-
increasing while Telegraf is running.
10+
By default bucket counts are not reset between periods and will be non-strictly
11+
increasing while Telegraf is running. This behavior can be changed by setting the
12+
`reset` parameter to true.
1213

1314
#### Design
1415

@@ -34,6 +35,10 @@ of the algorithm which is implemented in the Prometheus
3435
## aggregator and will not get sent to the output plugins.
3536
drop_original = false
3637

38+
## If true, the histogram will be reset on flush instead
39+
## of accumulating the results.
40+
reset = false
41+
3742
## Example config that aggregates all fields of the metric.
3843
# [[aggregators.histogram.config]]
3944
# ## The set of buckets.

plugins/aggregators/histogram/histogram.go

+15-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ const bucketInf = "+Inf"
1616

1717
// HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics
1818
type HistogramAggregator struct {
19-
Configs []config `toml:"config"`
19+
Configs []config `toml:"config"`
20+
ResetBuckets bool `toml:"reset"`
2021

2122
buckets bucketsByMetrics
2223
cache map[uint64]metricHistogramCollection
@@ -72,6 +73,10 @@ var sampleConfig = `
7273
## aggregator and will not get sent to the output plugins.
7374
drop_original = false
7475
76+
## If true, the histogram will be reset on flush instead
77+
## of accumulating the results.
78+
reset = false
79+
7580
## Example config that aggregates all fields of the metric.
7681
# [[aggregators.histogram.config]]
7782
# ## The set of buckets.
@@ -201,9 +206,15 @@ func (h *HistogramAggregator) groupField(
201206
)
202207
}
203208

204-
// Reset does nothing, because we need to collect counts for a long time, otherwise if config parameter 'reset' has
205-
// small value, we will get a histogram with a small amount of the distribution.
206-
func (h *HistogramAggregator) Reset() {}
209+
// Reset does nothing by default, because we typically need to collect counts for a long time.
210+
// Otherwise if config parameter 'reset' has 'true' value, we will get a histogram
211+
// with a small amount of the distribution. However in some use cases a reset is useful.
212+
func (h *HistogramAggregator) Reset() {
213+
if h.ResetBuckets {
214+
h.resetCache()
215+
h.buckets = make(bucketsByMetrics)
216+
}
217+
}
207218

208219
// resetCache resets cached counts(hits) in the buckets
209220
func (h *HistogramAggregator) resetCache() {

plugins/aggregators/histogram/histogram_test.go

+31-6
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import (
1212
)
1313

1414
// NewTestHistogram creates new test histogram aggregation with specified config
15-
func NewTestHistogram(cfg []config) telegraf.Aggregator {
16-
htm := &HistogramAggregator{Configs: cfg}
15+
func NewTestHistogram(cfg []config, reset bool) telegraf.Aggregator {
16+
htm := &HistogramAggregator{Configs: cfg, ResetBuckets: reset}
1717
htm.buckets = make(bucketsByMetrics)
1818
htm.resetCache()
1919

@@ -69,11 +69,12 @@ func BenchmarkApply(b *testing.B) {
6969
func TestHistogramWithPeriodAndOneField(t *testing.T) {
7070
var cfg []config
7171
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
72-
histogram := NewTestHistogram(cfg)
72+
histogram := NewTestHistogram(cfg, false)
7373

7474
acc := &testutil.Accumulator{}
7575

7676
histogram.Add(firstMetric1)
77+
histogram.Reset()
7778
histogram.Add(firstMetric2)
7879
histogram.Push(acc)
7980

@@ -88,12 +89,36 @@ func TestHistogramWithPeriodAndOneField(t *testing.T) {
8889
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, bucketInf)
8990
}
9091

92+
// TestHistogramWithPeriodAndOneField tests metrics for one period and for one field
93+
func TestHistogramWithReset(t *testing.T) {
94+
var cfg []config
95+
cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
96+
histogram := NewTestHistogram(cfg, true)
97+
98+
acc := &testutil.Accumulator{}
99+
100+
histogram.Add(firstMetric1)
101+
histogram.Reset()
102+
histogram.Add(firstMetric2)
103+
histogram.Push(acc)
104+
105+
if len(acc.Metrics) != 6 {
106+
assert.Fail(t, "Incorrect number of metrics")
107+
}
108+
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "0")
109+
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "10")
110+
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "20")
111+
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "30")
112+
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "40")
113+
assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, bucketInf)
114+
}
115+
91116
// TestHistogramWithPeriodAndAllFields tests two metrics for one period and for all fields
92117
func TestHistogramWithPeriodAndAllFields(t *testing.T) {
93118
var cfg []config
94119
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}})
95120
cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}})
96-
histogram := NewTestHistogram(cfg)
121+
histogram := NewTestHistogram(cfg, false)
97122

98123
acc := &testutil.Accumulator{}
99124

@@ -127,7 +152,7 @@ func TestHistogramDifferentPeriodsAndAllFields(t *testing.T) {
127152

128153
var cfg []config
129154
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}})
130-
histogram := NewTestHistogram(cfg)
155+
histogram := NewTestHistogram(cfg, false)
131156

132157
acc := &testutil.Accumulator{}
133158
histogram.Add(firstMetric1)
@@ -166,7 +191,7 @@ func TestWrongBucketsOrder(t *testing.T) {
166191

167192
var cfg []config
168193
cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 90.0, 20.0, 30.0, 40.0}})
169-
histogram := NewTestHistogram(cfg)
194+
histogram := NewTestHistogram(cfg, false)
170195
histogram.Add(firstMetric2)
171196
}
172197

0 commit comments

Comments
 (0)