Skip to content

Commit b737b77

Browse files
44pxotherpirate
authored andcommitted
Add Beanstalkd input plugin (influxdata#4272)
1 parent 3fe6923 commit b737b77

File tree

4 files changed

+701
-0
lines changed

4 files changed

+701
-0
lines changed

plugins/inputs/all/all.go

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
_ "github.com/influxdata/telegraf/plugins/inputs/apache"
88
_ "github.com/influxdata/telegraf/plugins/inputs/aurora"
99
_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
10+
_ "github.com/influxdata/telegraf/plugins/inputs/beanstalkd"
1011
_ "github.com/influxdata/telegraf/plugins/inputs/bond"
1112
_ "github.com/influxdata/telegraf/plugins/inputs/burrow"
1213
_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"

plugins/inputs/beanstalkd/README.md

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# Beanstalkd Input Plugin
2+
3+
The `beanstalkd` plugin collects server stats as well as tube stats (reported by `stats` and `stats-tube` commands respectively).
4+
5+
### Configuration:
6+
7+
```toml
8+
[[inputs.beanstalkd]]
9+
## Server to collect data from
10+
server = "localhost:11300"
11+
12+
## List of tubes to gather stats about.
13+
## If no tubes specified then data gathered for each tube on server reported by list-tubes command
14+
tubes = ["notifications"]
15+
```
16+
17+
### Metrics:
18+
19+
Please see the [Beanstalk Protocol doc](https://raw.githubusercontent.com/kr/beanstalkd/master/doc/protocol.txt) for detailed explanation of `stats` and `stats-tube` commands output.
20+
21+
`beanstalkd_overview` – statistical information about the system as a whole
22+
- fields
23+
- cmd_delete
24+
- cmd_pause_tube
25+
- current_jobs_buried
26+
- current_jobs_delayed
27+
- current_jobs_ready
28+
- current_jobs_reserved
29+
- current_jobs_urgent
30+
- current_using
31+
- current_waiting
32+
- current_watching
33+
- pause
34+
- pause_time_left
35+
- total_jobs
36+
- tags
37+
- name
38+
- server (address taken from config)
39+
40+
`beanstalkd_tube` – statistical information about the specified tube
41+
- fields
42+
- binlog_current_index
43+
- binlog_max_size
44+
- binlog_oldest_index
45+
- binlog_records_migrated
46+
- binlog_records_written
47+
- cmd_bury
48+
- cmd_delete
49+
- cmd_ignore
50+
- cmd_kick
51+
- cmd_list_tube_used
52+
- cmd_list_tubes
53+
- cmd_list_tubes_watched
54+
- cmd_pause_tube
55+
- cmd_peek
56+
- cmd_peek_buried
57+
- cmd_peek_delayed
58+
- cmd_peek_ready
59+
- cmd_put
60+
- cmd_release
61+
- cmd_reserve
62+
- cmd_reserve_with_timeout
63+
- cmd_stats
64+
- cmd_stats_job
65+
- cmd_stats_tube
66+
- cmd_touch
67+
- cmd_use
68+
- cmd_watch
69+
- current_connections
70+
- current_jobs_buried
71+
- current_jobs_delayed
72+
- current_jobs_ready
73+
- current_jobs_reserved
74+
- current_jobs_urgent
75+
- current_producers
76+
- current_tubes
77+
- current_waiting
78+
- current_workers
79+
- job_timeouts
80+
- max_job_size
81+
- pid
82+
- rusage_stime
83+
- rusage_utime
84+
- total_connections
85+
- total_jobs
86+
- uptime
87+
- tags
88+
- hostname
89+
- id
90+
- server (address taken from config)
91+
- version
92+
93+
### Example Output:
94+
```
95+
beanstalkd_overview,host=server.local,hostname=a2ab22ed12e0,id=232485800aa11b24,server=localhost:11300,version=1.10 cmd_stats_tube=29482i,current_jobs_delayed=0i,current_jobs_urgent=6i,cmd_kick=0i,cmd_stats=7378i,cmd_stats_job=0i,current_waiting=0i,max_job_size=65535i,pid=6i,cmd_bury=0i,cmd_reserve_with_timeout=0i,cmd_touch=0i,current_connections=1i,current_jobs_ready=6i,current_producers=0i,cmd_delete=0i,cmd_list_tubes=7369i,cmd_peek_ready=0i,cmd_put=6i,cmd_use=3i,cmd_watch=0i,current_jobs_reserved=0i,rusage_stime=6.07,cmd_list_tubes_watched=0i,cmd_pause_tube=0i,total_jobs=6i,binlog_records_migrated=0i,cmd_list_tube_used=0i,cmd_peek_delayed=0i,cmd_release=0i,current_jobs_buried=0i,job_timeouts=0i,binlog_current_index=0i,binlog_max_size=10485760i,total_connections=7378i,cmd_peek_buried=0i,cmd_reserve=0i,current_tubes=4i,binlog_records_written=0i,cmd_peek=0i,rusage_utime=1.13,uptime=7099i,binlog_oldest_index=0i,current_workers=0i,cmd_ignore=0i 1528801650000000000
96+
97+
beanstalkd_tube,host=server.local,name=notifications,server=localhost:11300 pause_time_left=0i,current_jobs_buried=0i,current_jobs_delayed=0i,current_jobs_reserved=0i,current_using=0i,current_waiting=0i,pause=0i,total_jobs=3i,cmd_delete=0i,cmd_pause_tube=0i,current_jobs_ready=3i,current_jobs_urgent=3i,current_watching=0i 1528801650000000000
98+
```
+270
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
package beanstalkd
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"net/textproto"
7+
"sync"
8+
9+
"github.com/influxdata/telegraf"
10+
"github.com/influxdata/telegraf/plugins/inputs"
11+
"gopkg.in/yaml.v2"
12+
)
13+
14+
const sampleConfig = `
15+
## Server to collect data from
16+
server = "localhost:11300"
17+
18+
## List of tubes to gather stats about.
19+
## If no tubes specified then data gathered for each tube on server reported by list-tubes command
20+
tubes = ["notifications"]
21+
`
22+
23+
type Beanstalkd struct {
24+
Server string `toml:"server"`
25+
Tubes []string `toml:"tubes"`
26+
}
27+
28+
func (b *Beanstalkd) Description() string {
29+
return "Collects Beanstalkd server and tubes stats"
30+
}
31+
32+
func (b *Beanstalkd) SampleConfig() string {
33+
return sampleConfig
34+
}
35+
36+
func (b *Beanstalkd) Gather(acc telegraf.Accumulator) error {
37+
connection, err := textproto.Dial("tcp", b.Server)
38+
if err != nil {
39+
return err
40+
}
41+
defer connection.Close()
42+
43+
tubes := b.Tubes
44+
if len(tubes) == 0 {
45+
err = runQuery(connection, "list-tubes", &tubes)
46+
if err != nil {
47+
acc.AddError(err)
48+
}
49+
}
50+
51+
var wg sync.WaitGroup
52+
53+
wg.Add(1)
54+
go func() {
55+
err := b.gatherServerStats(connection, acc)
56+
if err != nil {
57+
acc.AddError(err)
58+
}
59+
wg.Done()
60+
}()
61+
62+
for _, tube := range tubes {
63+
wg.Add(1)
64+
go func(tube string) {
65+
b.gatherTubeStats(connection, tube, acc)
66+
wg.Done()
67+
}(tube)
68+
}
69+
70+
wg.Wait()
71+
72+
return nil
73+
}
74+
75+
func (b *Beanstalkd) gatherServerStats(connection *textproto.Conn, acc telegraf.Accumulator) error {
76+
stats := new(statsResponse)
77+
if err := runQuery(connection, "stats", stats); err != nil {
78+
return err
79+
}
80+
81+
acc.AddFields("beanstalkd_overview",
82+
map[string]interface{}{
83+
"binlog_current_index": stats.BinlogCurrentIndex,
84+
"binlog_max_size": stats.BinlogMaxSize,
85+
"binlog_oldest_index": stats.BinlogOldestIndex,
86+
"binlog_records_migrated": stats.BinlogRecordsMigrated,
87+
"binlog_records_written": stats.BinlogRecordsWritten,
88+
"cmd_bury": stats.CmdBury,
89+
"cmd_delete": stats.CmdDelete,
90+
"cmd_ignore": stats.CmdIgnore,
91+
"cmd_kick": stats.CmdKick,
92+
"cmd_list_tube_used": stats.CmdListTubeUsed,
93+
"cmd_list_tubes": stats.CmdListTubes,
94+
"cmd_list_tubes_watched": stats.CmdListTubesWatched,
95+
"cmd_pause_tube": stats.CmdPauseTube,
96+
"cmd_peek": stats.CmdPeek,
97+
"cmd_peek_buried": stats.CmdPeekBuried,
98+
"cmd_peek_delayed": stats.CmdPeekDelayed,
99+
"cmd_peek_ready": stats.CmdPeekReady,
100+
"cmd_put": stats.CmdPut,
101+
"cmd_release": stats.CmdRelease,
102+
"cmd_reserve": stats.CmdReserve,
103+
"cmd_reserve_with_timeout": stats.CmdReserveWithTimeout,
104+
"cmd_stats": stats.CmdStats,
105+
"cmd_stats_job": stats.CmdStatsJob,
106+
"cmd_stats_tube": stats.CmdStatsTube,
107+
"cmd_touch": stats.CmdTouch,
108+
"cmd_use": stats.CmdUse,
109+
"cmd_watch": stats.CmdWatch,
110+
"current_connections": stats.CurrentConnections,
111+
"current_jobs_buried": stats.CurrentJobsBuried,
112+
"current_jobs_delayed": stats.CurrentJobsDelayed,
113+
"current_jobs_ready": stats.CurrentJobsReady,
114+
"current_jobs_reserved": stats.CurrentJobsReserved,
115+
"current_jobs_urgent": stats.CurrentJobsUrgent,
116+
"current_producers": stats.CurrentProducers,
117+
"current_tubes": stats.CurrentTubes,
118+
"current_waiting": stats.CurrentWaiting,
119+
"current_workers": stats.CurrentWorkers,
120+
"job_timeouts": stats.JobTimeouts,
121+
"max_job_size": stats.MaxJobSize,
122+
"pid": stats.Pid,
123+
"rusage_stime": stats.RusageStime,
124+
"rusage_utime": stats.RusageUtime,
125+
"total_connections": stats.TotalConnections,
126+
"total_jobs": stats.TotalJobs,
127+
"uptime": stats.Uptime,
128+
},
129+
map[string]string{
130+
"hostname": stats.Hostname,
131+
"id": stats.Id,
132+
"server": b.Server,
133+
"version": stats.Version,
134+
},
135+
)
136+
137+
return nil
138+
}
139+
140+
func (b *Beanstalkd) gatherTubeStats(connection *textproto.Conn, tube string, acc telegraf.Accumulator) error {
141+
stats := new(statsTubeResponse)
142+
if err := runQuery(connection, "stats-tube "+tube, stats); err != nil {
143+
return err
144+
}
145+
146+
acc.AddFields("beanstalkd_tube",
147+
map[string]interface{}{
148+
"cmd_delete": stats.CmdDelete,
149+
"cmd_pause_tube": stats.CmdPauseTube,
150+
"current_jobs_buried": stats.CurrentJobsBuried,
151+
"current_jobs_delayed": stats.CurrentJobsDelayed,
152+
"current_jobs_ready": stats.CurrentJobsReady,
153+
"current_jobs_reserved": stats.CurrentJobsReserved,
154+
"current_jobs_urgent": stats.CurrentJobsUrgent,
155+
"current_using": stats.CurrentUsing,
156+
"current_waiting": stats.CurrentWaiting,
157+
"current_watching": stats.CurrentWatching,
158+
"pause": stats.Pause,
159+
"pause_time_left": stats.PauseTimeLeft,
160+
"total_jobs": stats.TotalJobs,
161+
},
162+
map[string]string{
163+
"name": stats.Name,
164+
"server": b.Server,
165+
},
166+
)
167+
168+
return nil
169+
}
170+
171+
func runQuery(connection *textproto.Conn, cmd string, result interface{}) error {
172+
requestId, err := connection.Cmd(cmd)
173+
if err != nil {
174+
return err
175+
}
176+
177+
connection.StartResponse(requestId)
178+
defer connection.EndResponse(requestId)
179+
180+
status, err := connection.ReadLine()
181+
if err != nil {
182+
return err
183+
}
184+
185+
size := 0
186+
if _, err = fmt.Sscanf(status, "OK %d", &size); err != nil {
187+
return err
188+
}
189+
190+
body := make([]byte, size+2)
191+
if _, err = io.ReadFull(connection.R, body); err != nil {
192+
return err
193+
}
194+
195+
return yaml.Unmarshal(body, result)
196+
}
197+
198+
func init() {
199+
inputs.Add("beanstalkd", func() telegraf.Input {
200+
return &Beanstalkd{}
201+
})
202+
}
203+
204+
type statsResponse struct {
205+
BinlogCurrentIndex int `yaml:"binlog-current-index"`
206+
BinlogMaxSize int `yaml:"binlog-max-size"`
207+
BinlogOldestIndex int `yaml:"binlog-oldest-index"`
208+
BinlogRecordsMigrated int `yaml:"binlog-records-migrated"`
209+
BinlogRecordsWritten int `yaml:"binlog-records-written"`
210+
CmdBury int `yaml:"cmd-bury"`
211+
CmdDelete int `yaml:"cmd-delete"`
212+
CmdIgnore int `yaml:"cmd-ignore"`
213+
CmdKick int `yaml:"cmd-kick"`
214+
CmdListTubeUsed int `yaml:"cmd-list-tube-used"`
215+
CmdListTubes int `yaml:"cmd-list-tubes"`
216+
CmdListTubesWatched int `yaml:"cmd-list-tubes-watched"`
217+
CmdPauseTube int `yaml:"cmd-pause-tube"`
218+
CmdPeek int `yaml:"cmd-peek"`
219+
CmdPeekBuried int `yaml:"cmd-peek-buried"`
220+
CmdPeekDelayed int `yaml:"cmd-peek-delayed"`
221+
CmdPeekReady int `yaml:"cmd-peek-ready"`
222+
CmdPut int `yaml:"cmd-put"`
223+
CmdRelease int `yaml:"cmd-release"`
224+
CmdReserve int `yaml:"cmd-reserve"`
225+
CmdReserveWithTimeout int `yaml:"cmd-reserve-with-timeout"`
226+
CmdStats int `yaml:"cmd-stats"`
227+
CmdStatsJob int `yaml:"cmd-stats-job"`
228+
CmdStatsTube int `yaml:"cmd-stats-tube"`
229+
CmdTouch int `yaml:"cmd-touch"`
230+
CmdUse int `yaml:"cmd-use"`
231+
CmdWatch int `yaml:"cmd-watch"`
232+
CurrentConnections int `yaml:"current-connections"`
233+
CurrentJobsBuried int `yaml:"current-jobs-buried"`
234+
CurrentJobsDelayed int `yaml:"current-jobs-delayed"`
235+
CurrentJobsReady int `yaml:"current-jobs-ready"`
236+
CurrentJobsReserved int `yaml:"current-jobs-reserved"`
237+
CurrentJobsUrgent int `yaml:"current-jobs-urgent"`
238+
CurrentProducers int `yaml:"current-producers"`
239+
CurrentTubes int `yaml:"current-tubes"`
240+
CurrentWaiting int `yaml:"current-waiting"`
241+
CurrentWorkers int `yaml:"current-workers"`
242+
Hostname string `yaml:"hostname"`
243+
Id string `yaml:"id"`
244+
JobTimeouts int `yaml:"job-timeouts"`
245+
MaxJobSize int `yaml:"max-job-size"`
246+
Pid int `yaml:"pid"`
247+
RusageStime float64 `yaml:"rusage-stime"`
248+
RusageUtime float64 `yaml:"rusage-utime"`
249+
TotalConnections int `yaml:"total-connections"`
250+
TotalJobs int `yaml:"total-jobs"`
251+
Uptime int `yaml:"uptime"`
252+
Version string `yaml:"version"`
253+
}
254+
255+
type statsTubeResponse struct {
256+
CmdDelete int `yaml:"cmd-delete"`
257+
CmdPauseTube int `yaml:"cmd-pause-tube"`
258+
CurrentJobsBuried int `yaml:"current-jobs-buried"`
259+
CurrentJobsDelayed int `yaml:"current-jobs-delayed"`
260+
CurrentJobsReady int `yaml:"current-jobs-ready"`
261+
CurrentJobsReserved int `yaml:"current-jobs-reserved"`
262+
CurrentJobsUrgent int `yaml:"current-jobs-urgent"`
263+
CurrentUsing int `yaml:"current-using"`
264+
CurrentWaiting int `yaml:"current-waiting"`
265+
CurrentWatching int `yaml:"current-watching"`
266+
Name string `yaml:"name"`
267+
Pause int `yaml:"pause"`
268+
PauseTimeLeft int `yaml:"pause-time-left"`
269+
TotalJobs int `yaml:"total-jobs"`
270+
}

0 commit comments

Comments
 (0)