Skip to content

Commit 3d571ff

Browse files
committed
Add Accumulator to the ServiceInput Start() function
closes #666
1 parent 7f539c9 commit 3d571ff

File tree

14 files changed

+251
-275
lines changed

14 files changed

+251
-275
lines changed

agent/agent.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ func (a *Agent) Connect() error {
5858
}
5959
err := o.Output.Connect()
6060
if err != nil {
61-
log.Printf("Failed to connect to output %s, retrying in 15s, error was '%s' \n", o.Name, err)
61+
log.Printf("Failed to connect to output %s, retrying in 15s, "+
62+
"error was '%s' \n", o.Name, err)
6263
time.Sleep(15 * time.Second)
6364
err = o.Output.Connect()
6465
if err != nil {
@@ -241,7 +242,7 @@ func (a *Agent) Test() error {
241242
return nil
242243
}
243244

244-
// flush writes a list of points to all configured outputs
245+
// flush writes a list of metrics to all configured outputs
245246
func (a *Agent) flush() {
246247
var wg sync.WaitGroup
247248

@@ -260,7 +261,7 @@ func (a *Agent) flush() {
260261
wg.Wait()
261262
}
262263

263-
// flusher monitors the points input channel and flushes on the minimum interval
264+
// flusher monitors the metrics input channel and flushes on the minimum interval
264265
func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error {
265266
// Inelegant, but this sleep is to allow the Gather threads to run, so that
266267
// the flusher will flush after metrics are collected.
@@ -271,14 +272,14 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
271272
for {
272273
select {
273274
case <-shutdown:
274-
log.Println("Hang on, flushing any cached points before shutdown")
275+
log.Println("Hang on, flushing any cached metrics before shutdown")
275276
a.flush()
276277
return nil
277278
case <-ticker.C:
278279
a.flush()
279280
case m := <-metricC:
280281
for _, o := range a.Config.Outputs {
281-
o.AddPoint(m)
282+
o.AddMetric(m)
282283
}
283284
}
284285
}
@@ -318,8 +319,8 @@ func (a *Agent) Run(shutdown chan struct{}) error {
318319
a.Config.Agent.Interval.Duration, a.Config.Agent.Debug, a.Config.Agent.Quiet,
319320
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration)
320321

321-
// channel shared between all input threads for accumulating points
322-
metricC := make(chan telegraf.Metric, 1000)
322+
// channel shared between all input threads for accumulating metrics
323+
metricC := make(chan telegraf.Metric, 10000)
323324

324325
// Round collection to nearest interval by sleeping
325326
if a.Config.Agent.RoundInterval {
@@ -342,7 +343,10 @@ func (a *Agent) Run(shutdown chan struct{}) error {
342343
// Start service of any ServicePlugins
343344
switch p := input.Input.(type) {
344345
case telegraf.ServiceInput:
345-
if err := p.Start(); err != nil {
346+
acc := NewAccumulator(input.Config, metricC)
347+
acc.SetDebug(a.Config.Agent.Debug)
348+
acc.setDefaultTags(a.Config.Tags)
349+
if err := p.Start(acc); err != nil {
346350
log.Printf("Service for input %s failed to start, exiting\n%s\n",
347351
input.Name, err.Error())
348352
return err

input.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type ServiceInput interface {
2424
Gather(Accumulator) error
2525

2626
// Start starts the ServiceInput's service, whatever that may be
27-
Start() error
27+
Start(Accumulator) error
2828

2929
// Stop stops the services and closes any necessary channels and connections
3030
Stop()

internal/config/config.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ type AgentConfig struct {
6868
// same time, which can have a measurable effect on the system.
6969
CollectionJitter internal.Duration
7070

71-
// Interval at which to flush data
71+
// FlushInterval is the Interval at which to flush data
7272
FlushInterval internal.Duration
7373

7474
// FlushJitter Jitters the flush interval by a random amount.
@@ -82,6 +82,11 @@ type AgentConfig struct {
8282
// full, the oldest metrics will be overwritten.
8383
MetricBufferLimit int
8484

85+
// FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever
86+
// it fills up, regardless of FlushInterval. Setting this option to true
87+
// does _not_ deactivate FlushInterval.
88+
FlushBufferWhenFull bool
89+
8590
// TODO(cam): Remove UTC and Precision parameters, they are no longer
8691
// valid for the agent config. Leaving them here for now for backwards-
8792
// compatability
@@ -157,6 +162,8 @@ var header = `##################################################################
157162
### Telegraf will cache metric_buffer_limit metrics for each output, and will
158163
### flush this buffer on a successful write.
159164
metric_buffer_limit = 10000
165+
### Flush the buffer whenever full, regardless of flush_interval.
166+
flush_buffer_when_full = true
160167
161168
### Collection jitter is used to jitter the collection by a random amount.
162169
### Each plugin will sleep for a random time within jitter before collecting.
@@ -421,8 +428,9 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
421428

422429
ro := internal_models.NewRunningOutput(name, output, outputConfig)
423430
if c.Agent.MetricBufferLimit > 0 {
424-
ro.PointBufferLimit = c.Agent.MetricBufferLimit
431+
ro.MetricBufferLimit = c.Agent.MetricBufferLimit
425432
}
433+
ro.FlushBufferWhenFull = c.Agent.FlushBufferWhenFull
426434
ro.Quiet = c.Agent.Quiet
427435
c.Outputs = append(c.Outputs, ro)
428436
return nil

internal/models/running_output.go

+91-28
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,34 @@ package internal_models
22

33
import (
44
"log"
5+
"sync"
56
"time"
67

78
"github.com/influxdata/telegraf"
89
)
910

10-
const DEFAULT_POINT_BUFFER_LIMIT = 10000
11+
const (
12+
// Default number of metrics kept between flushes.
13+
DEFAULT_METRIC_BUFFER_LIMIT = 10000
14+
15+
// Limit how many full metric buffers are kept due to failed writes.
16+
FULL_METRIC_BUFFERS_LIMIT = 100
17+
)
1118

1219
type RunningOutput struct {
13-
Name string
14-
Output telegraf.Output
15-
Config *OutputConfig
16-
Quiet bool
17-
PointBufferLimit int
20+
Name string
21+
Output telegraf.Output
22+
Config *OutputConfig
23+
Quiet bool
24+
MetricBufferLimit int
25+
FlushBufferWhenFull bool
1826

19-
metrics []telegraf.Metric
20-
overwriteCounter int
27+
metrics []telegraf.Metric
28+
tmpmetrics map[int][]telegraf.Metric
29+
overwriteI int
30+
mapI int
31+
32+
sync.Mutex
2133
}
2234

2335
func NewRunningOutput(
@@ -26,47 +38,98 @@ func NewRunningOutput(
2638
conf *OutputConfig,
2739
) *RunningOutput {
2840
ro := &RunningOutput{
29-
Name: name,
30-
metrics: make([]telegraf.Metric, 0),
31-
Output: output,
32-
Config: conf,
33-
PointBufferLimit: DEFAULT_POINT_BUFFER_LIMIT,
41+
Name: name,
42+
metrics: make([]telegraf.Metric, 0),
43+
tmpmetrics: make(map[int][]telegraf.Metric),
44+
Output: output,
45+
Config: conf,
46+
MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT,
3447
}
3548
return ro
3649
}
3750

38-
func (ro *RunningOutput) AddPoint(point telegraf.Metric) {
51+
// AddMetric adds a metric to the output. This function can also write cached
52+
// points if FlushBufferWhenFull is true.
53+
func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
3954
if ro.Config.Filter.IsActive {
40-
if !ro.Config.Filter.ShouldMetricPass(point) {
55+
if !ro.Config.Filter.ShouldMetricPass(metric) {
4156
return
4257
}
4358
}
4459

45-
if len(ro.metrics) < ro.PointBufferLimit {
46-
ro.metrics = append(ro.metrics, point)
60+
if len(ro.metrics) < ro.MetricBufferLimit {
61+
ro.Lock()
62+
ro.metrics = append(ro.metrics, metric)
63+
ro.Unlock()
4764
} else {
48-
log.Printf("WARNING: overwriting cached metrics, you may want to " +
49-
"increase the metric_buffer_limit setting in your [agent] config " +
50-
"if you do not wish to overwrite metrics.\n")
51-
if ro.overwriteCounter == len(ro.metrics) {
52-
ro.overwriteCounter = 0
65+
if ro.FlushBufferWhenFull {
66+
ro.Lock()
67+
tmpmetrics := make([]telegraf.Metric, len(ro.metrics))
68+
copy(tmpmetrics, ro.metrics)
69+
ro.metrics = make([]telegraf.Metric, 0)
70+
ro.Unlock()
71+
err := ro.write(tmpmetrics)
72+
if err != nil {
73+
log.Printf("ERROR writing full metric buffer to output %s, %s",
74+
ro.Name, err)
75+
if len(ro.tmpmetrics) == FULL_METRIC_BUFFERS_LIMIT {
76+
ro.mapI = 0
77+
// overwrite one
78+
ro.tmpmetrics[ro.mapI] = tmpmetrics
79+
ro.mapI++
80+
} else {
81+
ro.tmpmetrics[ro.mapI] = tmpmetrics
82+
ro.mapI++
83+
}
84+
}
85+
} else {
86+
log.Printf("WARNING: overwriting cached metrics, you may want to " +
87+
"increase the metric_buffer_limit setting in your [agent] " +
88+
"config if you do not wish to overwrite metrics.\n")
89+
ro.Lock()
90+
if ro.overwriteI == len(ro.metrics) {
91+
ro.overwriteI = 0
92+
}
93+
ro.metrics[ro.overwriteI] = metric
94+
ro.overwriteI++
95+
ro.Unlock()
5396
}
54-
ro.metrics[ro.overwriteCounter] = point
55-
ro.overwriteCounter++
5697
}
5798
}
5899

100+
// Write writes all cached points to this output.
59101
func (ro *RunningOutput) Write() error {
102+
ro.Lock()
103+
err := ro.write(ro.metrics)
104+
if err != nil {
105+
return err
106+
} else {
107+
ro.metrics = make([]telegraf.Metric, 0)
108+
ro.overwriteI = 0
109+
}
110+
ro.Unlock()
111+
112+
// Write any cached metric buffers that failed previously
113+
for i, tmpmetrics := range ro.tmpmetrics {
114+
if err := ro.write(tmpmetrics); err != nil {
115+
return err
116+
} else {
117+
delete(ro.tmpmetrics, i)
118+
}
119+
}
120+
121+
return nil
122+
}
123+
124+
func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
60125
start := time.Now()
61-
err := ro.Output.Write(ro.metrics)
126+
err := ro.Output.Write(metrics)
62127
elapsed := time.Since(start)
63128
if err == nil {
64129
if !ro.Quiet {
65130
log.Printf("Wrote %d metrics to output %s in %s\n",
66-
len(ro.metrics), ro.Name, elapsed)
131+
len(metrics), ro.Name, elapsed)
67132
}
68-
ro.metrics = make([]telegraf.Metric, 0)
69-
ro.overwriteCounter = 0
70133
}
71134
return err
72135
}

plugins/inputs/github_webhooks/github_webhooks.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (gh *GithubWebhooks) Listen() {
6161
}
6262
}
6363

64-
func (gh *GithubWebhooks) Start() error {
64+
func (gh *GithubWebhooks) Start(_ telegraf.Accumulator) error {
6565
go gh.Listen()
6666
log.Printf("Started the github_webhooks service on %s\n", gh.ServiceAddress)
6767
return nil

plugins/inputs/kafka_consumer/kafka_consumer.go

+12-31
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package kafka_consumer
22

33
import (
4-
"fmt"
54
"log"
65
"strings"
76
"sync"
@@ -19,11 +18,13 @@ type Kafka struct {
1918
Topics []string
2019
ZookeeperPeers []string
2120
Consumer *consumergroup.ConsumerGroup
22-
MetricBuffer int
21+
22+
// Legacy metric buffer support
23+
MetricBuffer int
2324
// TODO remove PointBuffer, legacy support
2425
PointBuffer int
25-
Offset string
2626

27+
Offset string
2728
parser parsers.Parser
2829

2930
sync.Mutex
@@ -32,9 +33,10 @@ type Kafka struct {
3233
in <-chan *sarama.ConsumerMessage
3334
// channel for all kafka consumer errors
3435
errs <-chan *sarama.ConsumerError
35-
// channel for all incoming parsed kafka metrics
36-
metricC chan telegraf.Metric
37-
done chan struct{}
36+
done chan struct{}
37+
38+
// keep the accumulator internally:
39+
acc telegraf.Accumulator
3840

3941
// doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer
4042
// this is mostly for test purposes, but there may be a use-case for it later.
@@ -48,8 +50,6 @@ var sampleConfig = `
4850
zookeeper_peers = ["localhost:2181"]
4951
### the name of the consumer group
5052
consumer_group = "telegraf_metrics_consumers"
51-
### Maximum number of metrics to buffer between collection intervals
52-
metric_buffer = 100000
5353
### Offset (must be either "oldest" or "newest")
5454
offset = "oldest"
5555
@@ -72,11 +72,13 @@ func (k *Kafka) SetParser(parser parsers.Parser) {
7272
k.parser = parser
7373
}
7474

75-
func (k *Kafka) Start() error {
75+
func (k *Kafka) Start(acc telegraf.Accumulator) error {
7676
k.Lock()
7777
defer k.Unlock()
7878
var consumerErr error
7979

80+
k.acc = acc
81+
8082
config := consumergroup.NewConfig()
8183
switch strings.ToLower(k.Offset) {
8284
case "oldest", "":
@@ -106,13 +108,6 @@ func (k *Kafka) Start() error {
106108
}
107109

108110
k.done = make(chan struct{})
109-
if k.PointBuffer == 0 && k.MetricBuffer == 0 {
110-
k.MetricBuffer = 100000
111-
} else if k.PointBuffer > 0 {
112-
// Legacy support of PointBuffer field TODO remove
113-
k.MetricBuffer = k.PointBuffer
114-
}
115-
k.metricC = make(chan telegraf.Metric, k.MetricBuffer)
116111

117112
// Start the kafka message reader
118113
go k.receiver()
@@ -138,14 +133,7 @@ func (k *Kafka) receiver() {
138133
}
139134

140135
for _, metric := range metrics {
141-
fmt.Println(string(metric.Name()))
142-
select {
143-
case k.metricC <- metric:
144-
continue
145-
default:
146-
log.Printf("Kafka Consumer buffer is full, dropping a metric." +
147-
" You may want to increase the metric_buffer setting")
148-
}
136+
k.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
149137
}
150138

151139
if !k.doNotCommitMsgs {
@@ -169,13 +157,6 @@ func (k *Kafka) Stop() {
169157
}
170158

171159
func (k *Kafka) Gather(acc telegraf.Accumulator) error {
172-
k.Lock()
173-
defer k.Unlock()
174-
nmetrics := len(k.metricC)
175-
for i := 0; i < nmetrics; i++ {
176-
metric := <-k.metricC
177-
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
178-
}
179160
return nil
180161
}
181162

0 commit comments

Comments
 (0)