9
9
"sync"
10
10
"time"
11
11
12
- "github.com/influxdb/telegraf/internal"
13
12
"github.com/influxdb/telegraf/internal/config"
14
13
"github.com/influxdb/telegraf/outputs"
15
14
"github.com/influxdb/telegraf/plugins"
@@ -19,77 +18,27 @@ import (
19
18
20
19
// Agent runs telegraf and collects data based on the given config
21
20
type Agent struct {
22
-
23
- // Interval at which to gather information
24
- Interval internal.Duration
25
-
26
- // RoundInterval rounds collection interval to 'interval'.
27
- // ie, if Interval=10s then always collect on :00, :10, :20, etc.
28
- RoundInterval bool
29
-
30
- // Interval at which to flush data
31
- FlushInterval internal.Duration
32
-
33
- // FlushRetries is the number of times to retry each data flush
34
- FlushRetries int
35
-
36
- // FlushJitter tells
37
- FlushJitter internal.Duration
38
-
39
- // TODO(cam): Remove UTC and Precision parameters, they are no longer
40
- // valid for the agent config. Leaving them here for now for backwards-
41
- // compatability
42
-
43
- // Option for outputting data in UTC
44
- UTC bool `toml:"utc"`
45
-
46
- // Precision to write data at
47
- // Valid values for Precision are n, u, ms, s, m, and h
48
- Precision string
49
-
50
- // Option for running in debug mode
51
- Debug bool
52
- Hostname string
53
-
54
- Tags map [string ]string
55
-
56
21
Config * config.Config
57
22
}
58
23
59
24
// NewAgent returns an Agent struct based off the given Config
60
25
func NewAgent (config * config.Config ) (* Agent , error ) {
61
- agent := & Agent {
62
- Tags : make (map [string ]string ),
63
- Config : config ,
64
- Interval : internal.Duration {10 * time .Second },
65
- RoundInterval : true ,
66
- FlushInterval : internal.Duration {10 * time .Second },
67
- FlushRetries : 2 ,
68
- FlushJitter : internal.Duration {5 * time .Second },
26
+ a := & Agent {
27
+ Config : config ,
69
28
}
70
29
71
- // Apply the toml table to the agent config, overriding defaults
72
- err := config .ApplyAgent (agent )
73
- if err != nil {
74
- return nil , err
75
- }
76
-
77
- if agent .Hostname == "" {
30
+ if a .Config .Agent .Hostname == "" {
78
31
hostname , err := os .Hostname ()
79
32
if err != nil {
80
33
return nil , err
81
34
}
82
35
83
- agent .Hostname = hostname
84
- }
85
-
86
- if config .Tags == nil {
87
- config .Tags = map [string ]string {}
36
+ a .Config .Agent .Hostname = hostname
88
37
}
89
38
90
- config .Tags ["host" ] = agent .Hostname
39
+ config .Tags ["host" ] = a . Config . Agent .Hostname
91
40
92
- return agent , nil
41
+ return a , nil
93
42
}
94
43
95
44
// Connect connects to all configured outputs
@@ -104,7 +53,7 @@ func (a *Agent) Connect() error {
104
53
}
105
54
}
106
55
107
- if a .Debug {
56
+ if a .Config . Agent . Debug {
108
57
log .Printf ("Attempting connection to output: %s\n " , o .Name )
109
58
}
110
59
err := o .Output .Connect ()
@@ -116,7 +65,7 @@ func (a *Agent) Connect() error {
116
65
return err
117
66
}
118
67
}
119
- if a .Debug {
68
+ if a .Config . Agent . Debug {
120
69
log .Printf ("Successfully connected to output: %s\n " , o .Name )
121
70
}
122
71
}
@@ -154,9 +103,9 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
154
103
defer wg .Done ()
155
104
156
105
acc := NewAccumulator (plugin .Config , pointChan )
157
- acc .SetDebug (a .Debug )
106
+ acc .SetDebug (a .Config . Agent . Debug )
158
107
acc .SetPrefix (plugin .Name + "_" )
159
- acc .SetDefaultTags (a .Tags )
108
+ acc .SetDefaultTags (a .Config . Tags )
160
109
161
110
if err := plugin .Plugin .Gather (acc ); err != nil {
162
111
log .Printf ("Error in plugin [%s]: %s" , plugin .Name , err )
@@ -169,7 +118,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {
169
118
170
119
elapsed := time .Since (start )
171
120
log .Printf ("Gathered metrics, (%s interval), from %d plugins in %s\n " ,
172
- a .Interval , counter , elapsed )
121
+ a .Config . Agent . Interval , counter , elapsed )
173
122
return nil
174
123
}
175
124
@@ -187,9 +136,9 @@ func (a *Agent) gatherSeparate(
187
136
start := time .Now ()
188
137
189
138
acc := NewAccumulator (plugin .Config , pointChan )
190
- acc .SetDebug (a .Debug )
139
+ acc .SetDebug (a .Config . Agent . Debug )
191
140
acc .SetPrefix (plugin .Name + "_" )
192
- acc .SetDefaultTags (a .Tags )
141
+ acc .SetDefaultTags (a .Config . Tags )
193
142
194
143
if err := plugin .Plugin .Gather (acc ); err != nil {
195
144
log .Printf ("Error in plugin [%s]: %s" , plugin .Name , err )
@@ -273,7 +222,7 @@ func (a *Agent) writeOutput(
273
222
return
274
223
}
275
224
retry := 0
276
- retries := a .FlushRetries
225
+ retries := a .Config . Agent . FlushRetries
277
226
start := time .Now ()
278
227
279
228
for {
@@ -299,8 +248,8 @@ func (a *Agent) writeOutput(
299
248
} else if err != nil {
300
249
// Sleep for a retry
301
250
log .Printf ("Error in output [%s]: %s, retrying in %s" ,
302
- ro .Name , err .Error (), a .FlushInterval .Duration )
303
- time .Sleep (a .FlushInterval .Duration )
251
+ ro .Name , err .Error (), a .Config . Agent . FlushInterval .Duration )
252
+ time .Sleep (a .Config . Agent . FlushInterval .Duration )
304
253
}
305
254
}
306
255
@@ -330,7 +279,7 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er
330
279
// the flusher will flush after metrics are collected.
331
280
time .Sleep (time .Millisecond * 100 )
332
281
333
- ticker := time .NewTicker (a .FlushInterval .Duration )
282
+ ticker := time .NewTicker (a .Config . Agent . FlushInterval .Duration )
334
283
points := make ([]* client.Point , 0 )
335
284
336
285
for {
@@ -373,22 +322,23 @@ func jitterInterval(ininterval, injitter time.Duration) time.Duration {
373
322
func (a * Agent ) Run (shutdown chan struct {}) error {
374
323
var wg sync.WaitGroup
375
324
376
- a .FlushInterval .Duration = jitterInterval (a .FlushInterval .Duration ,
377
- a .FlushJitter .Duration )
325
+ a .Config . Agent . FlushInterval .Duration = jitterInterval (a . Config . Agent .FlushInterval .Duration ,
326
+ a .Config . Agent . FlushJitter .Duration )
378
327
379
328
log .Printf ("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v, " +
380
329
"Flush Interval:%s\n " ,
381
- a .Interval , a .Debug , a .Hostname , a .FlushInterval )
330
+ a .Config .Agent .Interval , a .Config .Agent .Debug ,
331
+ a .Config .Agent .Hostname , a .Config .Agent .FlushInterval )
382
332
383
333
// channel shared between all plugin threads for accumulating points
384
334
pointChan := make (chan * client.Point , 1000 )
385
335
386
336
// Round collection to nearest interval by sleeping
387
- if a .RoundInterval {
388
- i := int64 (a .Interval .Duration )
337
+ if a .Config . Agent . RoundInterval {
338
+ i := int64 (a .Config . Agent . Interval .Duration )
389
339
time .Sleep (time .Duration (i - (time .Now ().UnixNano () % i )))
390
340
}
391
- ticker := time .NewTicker (a .Interval .Duration )
341
+ ticker := time .NewTicker (a .Config . Agent . Interval .Duration )
392
342
393
343
wg .Add (1 )
394
344
go func () {
0 commit comments