|
| 1 | +package beanstalkd |
| 2 | + |
| 3 | +import ( |
| 4 | + "fmt" |
| 5 | + "io" |
| 6 | + "net/textproto" |
| 7 | + "sync" |
| 8 | + |
| 9 | + "github.com/influxdata/telegraf" |
| 10 | + "github.com/influxdata/telegraf/plugins/inputs" |
| 11 | + "gopkg.in/yaml.v2" |
| 12 | +) |
| 13 | + |
| 14 | +const sampleConfig = ` |
| 15 | + ## Server to collect data from |
| 16 | + server = "localhost:11300" |
| 17 | +
|
| 18 | + ## List of tubes to gather stats about. |
| 19 | + ## If no tubes specified then data gathered for each tube on server reported by list-tubes command |
| 20 | + tubes = ["notifications"] |
| 21 | +` |
| 22 | + |
| 23 | +type Beanstalkd struct { |
| 24 | + Server string `toml:"server"` |
| 25 | + Tubes []string `toml:"tubes"` |
| 26 | +} |
| 27 | + |
| 28 | +func (b *Beanstalkd) Description() string { |
| 29 | + return "Collects Beanstalkd server and tubes stats" |
| 30 | +} |
| 31 | + |
| 32 | +func (b *Beanstalkd) SampleConfig() string { |
| 33 | + return sampleConfig |
| 34 | +} |
| 35 | + |
| 36 | +func (b *Beanstalkd) Gather(acc telegraf.Accumulator) error { |
| 37 | + connection, err := textproto.Dial("tcp", b.Server) |
| 38 | + if err != nil { |
| 39 | + return err |
| 40 | + } |
| 41 | + defer connection.Close() |
| 42 | + |
| 43 | + tubes := b.Tubes |
| 44 | + if len(tubes) == 0 { |
| 45 | + err = runQuery(connection, "list-tubes", &tubes) |
| 46 | + if err != nil { |
| 47 | + acc.AddError(err) |
| 48 | + } |
| 49 | + } |
| 50 | + |
| 51 | + var wg sync.WaitGroup |
| 52 | + |
| 53 | + wg.Add(1) |
| 54 | + go func() { |
| 55 | + err := b.gatherServerStats(connection, acc) |
| 56 | + if err != nil { |
| 57 | + acc.AddError(err) |
| 58 | + } |
| 59 | + wg.Done() |
| 60 | + }() |
| 61 | + |
| 62 | + for _, tube := range tubes { |
| 63 | + wg.Add(1) |
| 64 | + go func(tube string) { |
| 65 | + b.gatherTubeStats(connection, tube, acc) |
| 66 | + wg.Done() |
| 67 | + }(tube) |
| 68 | + } |
| 69 | + |
| 70 | + wg.Wait() |
| 71 | + |
| 72 | + return nil |
| 73 | +} |
| 74 | + |
| 75 | +func (b *Beanstalkd) gatherServerStats(connection *textproto.Conn, acc telegraf.Accumulator) error { |
| 76 | + stats := new(statsResponse) |
| 77 | + if err := runQuery(connection, "stats", stats); err != nil { |
| 78 | + return err |
| 79 | + } |
| 80 | + |
| 81 | + acc.AddFields("beanstalkd_overview", |
| 82 | + map[string]interface{}{ |
| 83 | + "binlog_current_index": stats.BinlogCurrentIndex, |
| 84 | + "binlog_max_size": stats.BinlogMaxSize, |
| 85 | + "binlog_oldest_index": stats.BinlogOldestIndex, |
| 86 | + "binlog_records_migrated": stats.BinlogRecordsMigrated, |
| 87 | + "binlog_records_written": stats.BinlogRecordsWritten, |
| 88 | + "cmd_bury": stats.CmdBury, |
| 89 | + "cmd_delete": stats.CmdDelete, |
| 90 | + "cmd_ignore": stats.CmdIgnore, |
| 91 | + "cmd_kick": stats.CmdKick, |
| 92 | + "cmd_list_tube_used": stats.CmdListTubeUsed, |
| 93 | + "cmd_list_tubes": stats.CmdListTubes, |
| 94 | + "cmd_list_tubes_watched": stats.CmdListTubesWatched, |
| 95 | + "cmd_pause_tube": stats.CmdPauseTube, |
| 96 | + "cmd_peek": stats.CmdPeek, |
| 97 | + "cmd_peek_buried": stats.CmdPeekBuried, |
| 98 | + "cmd_peek_delayed": stats.CmdPeekDelayed, |
| 99 | + "cmd_peek_ready": stats.CmdPeekReady, |
| 100 | + "cmd_put": stats.CmdPut, |
| 101 | + "cmd_release": stats.CmdRelease, |
| 102 | + "cmd_reserve": stats.CmdReserve, |
| 103 | + "cmd_reserve_with_timeout": stats.CmdReserveWithTimeout, |
| 104 | + "cmd_stats": stats.CmdStats, |
| 105 | + "cmd_stats_job": stats.CmdStatsJob, |
| 106 | + "cmd_stats_tube": stats.CmdStatsTube, |
| 107 | + "cmd_touch": stats.CmdTouch, |
| 108 | + "cmd_use": stats.CmdUse, |
| 109 | + "cmd_watch": stats.CmdWatch, |
| 110 | + "current_connections": stats.CurrentConnections, |
| 111 | + "current_jobs_buried": stats.CurrentJobsBuried, |
| 112 | + "current_jobs_delayed": stats.CurrentJobsDelayed, |
| 113 | + "current_jobs_ready": stats.CurrentJobsReady, |
| 114 | + "current_jobs_reserved": stats.CurrentJobsReserved, |
| 115 | + "current_jobs_urgent": stats.CurrentJobsUrgent, |
| 116 | + "current_producers": stats.CurrentProducers, |
| 117 | + "current_tubes": stats.CurrentTubes, |
| 118 | + "current_waiting": stats.CurrentWaiting, |
| 119 | + "current_workers": stats.CurrentWorkers, |
| 120 | + "job_timeouts": stats.JobTimeouts, |
| 121 | + "max_job_size": stats.MaxJobSize, |
| 122 | + "pid": stats.Pid, |
| 123 | + "rusage_stime": stats.RusageStime, |
| 124 | + "rusage_utime": stats.RusageUtime, |
| 125 | + "total_connections": stats.TotalConnections, |
| 126 | + "total_jobs": stats.TotalJobs, |
| 127 | + "uptime": stats.Uptime, |
| 128 | + }, |
| 129 | + map[string]string{ |
| 130 | + "hostname": stats.Hostname, |
| 131 | + "id": stats.Id, |
| 132 | + "server": b.Server, |
| 133 | + "version": stats.Version, |
| 134 | + }, |
| 135 | + ) |
| 136 | + |
| 137 | + return nil |
| 138 | +} |
| 139 | + |
| 140 | +func (b *Beanstalkd) gatherTubeStats(connection *textproto.Conn, tube string, acc telegraf.Accumulator) error { |
| 141 | + stats := new(statsTubeResponse) |
| 142 | + if err := runQuery(connection, "stats-tube "+tube, stats); err != nil { |
| 143 | + return err |
| 144 | + } |
| 145 | + |
| 146 | + acc.AddFields("beanstalkd_tube", |
| 147 | + map[string]interface{}{ |
| 148 | + "cmd_delete": stats.CmdDelete, |
| 149 | + "cmd_pause_tube": stats.CmdPauseTube, |
| 150 | + "current_jobs_buried": stats.CurrentJobsBuried, |
| 151 | + "current_jobs_delayed": stats.CurrentJobsDelayed, |
| 152 | + "current_jobs_ready": stats.CurrentJobsReady, |
| 153 | + "current_jobs_reserved": stats.CurrentJobsReserved, |
| 154 | + "current_jobs_urgent": stats.CurrentJobsUrgent, |
| 155 | + "current_using": stats.CurrentUsing, |
| 156 | + "current_waiting": stats.CurrentWaiting, |
| 157 | + "current_watching": stats.CurrentWatching, |
| 158 | + "pause": stats.Pause, |
| 159 | + "pause_time_left": stats.PauseTimeLeft, |
| 160 | + "total_jobs": stats.TotalJobs, |
| 161 | + }, |
| 162 | + map[string]string{ |
| 163 | + "name": stats.Name, |
| 164 | + "server": b.Server, |
| 165 | + }, |
| 166 | + ) |
| 167 | + |
| 168 | + return nil |
| 169 | +} |
| 170 | + |
| 171 | +func runQuery(connection *textproto.Conn, cmd string, result interface{}) error { |
| 172 | + requestId, err := connection.Cmd(cmd) |
| 173 | + if err != nil { |
| 174 | + return err |
| 175 | + } |
| 176 | + |
| 177 | + connection.StartResponse(requestId) |
| 178 | + defer connection.EndResponse(requestId) |
| 179 | + |
| 180 | + status, err := connection.ReadLine() |
| 181 | + if err != nil { |
| 182 | + return err |
| 183 | + } |
| 184 | + |
| 185 | + size := 0 |
| 186 | + if _, err = fmt.Sscanf(status, "OK %d", &size); err != nil { |
| 187 | + return err |
| 188 | + } |
| 189 | + |
| 190 | + body := make([]byte, size+2) |
| 191 | + if _, err = io.ReadFull(connection.R, body); err != nil { |
| 192 | + return err |
| 193 | + } |
| 194 | + |
| 195 | + return yaml.Unmarshal(body, result) |
| 196 | +} |
| 197 | + |
| 198 | +func init() { |
| 199 | + inputs.Add("beanstalkd", func() telegraf.Input { |
| 200 | + return &Beanstalkd{} |
| 201 | + }) |
| 202 | +} |
| 203 | + |
| 204 | +type statsResponse struct { |
| 205 | + BinlogCurrentIndex int `yaml:"binlog-current-index"` |
| 206 | + BinlogMaxSize int `yaml:"binlog-max-size"` |
| 207 | + BinlogOldestIndex int `yaml:"binlog-oldest-index"` |
| 208 | + BinlogRecordsMigrated int `yaml:"binlog-records-migrated"` |
| 209 | + BinlogRecordsWritten int `yaml:"binlog-records-written"` |
| 210 | + CmdBury int `yaml:"cmd-bury"` |
| 211 | + CmdDelete int `yaml:"cmd-delete"` |
| 212 | + CmdIgnore int `yaml:"cmd-ignore"` |
| 213 | + CmdKick int `yaml:"cmd-kick"` |
| 214 | + CmdListTubeUsed int `yaml:"cmd-list-tube-used"` |
| 215 | + CmdListTubes int `yaml:"cmd-list-tubes"` |
| 216 | + CmdListTubesWatched int `yaml:"cmd-list-tubes-watched"` |
| 217 | + CmdPauseTube int `yaml:"cmd-pause-tube"` |
| 218 | + CmdPeek int `yaml:"cmd-peek"` |
| 219 | + CmdPeekBuried int `yaml:"cmd-peek-buried"` |
| 220 | + CmdPeekDelayed int `yaml:"cmd-peek-delayed"` |
| 221 | + CmdPeekReady int `yaml:"cmd-peek-ready"` |
| 222 | + CmdPut int `yaml:"cmd-put"` |
| 223 | + CmdRelease int `yaml:"cmd-release"` |
| 224 | + CmdReserve int `yaml:"cmd-reserve"` |
| 225 | + CmdReserveWithTimeout int `yaml:"cmd-reserve-with-timeout"` |
| 226 | + CmdStats int `yaml:"cmd-stats"` |
| 227 | + CmdStatsJob int `yaml:"cmd-stats-job"` |
| 228 | + CmdStatsTube int `yaml:"cmd-stats-tube"` |
| 229 | + CmdTouch int `yaml:"cmd-touch"` |
| 230 | + CmdUse int `yaml:"cmd-use"` |
| 231 | + CmdWatch int `yaml:"cmd-watch"` |
| 232 | + CurrentConnections int `yaml:"current-connections"` |
| 233 | + CurrentJobsBuried int `yaml:"current-jobs-buried"` |
| 234 | + CurrentJobsDelayed int `yaml:"current-jobs-delayed"` |
| 235 | + CurrentJobsReady int `yaml:"current-jobs-ready"` |
| 236 | + CurrentJobsReserved int `yaml:"current-jobs-reserved"` |
| 237 | + CurrentJobsUrgent int `yaml:"current-jobs-urgent"` |
| 238 | + CurrentProducers int `yaml:"current-producers"` |
| 239 | + CurrentTubes int `yaml:"current-tubes"` |
| 240 | + CurrentWaiting int `yaml:"current-waiting"` |
| 241 | + CurrentWorkers int `yaml:"current-workers"` |
| 242 | + Hostname string `yaml:"hostname"` |
| 243 | + Id string `yaml:"id"` |
| 244 | + JobTimeouts int `yaml:"job-timeouts"` |
| 245 | + MaxJobSize int `yaml:"max-job-size"` |
| 246 | + Pid int `yaml:"pid"` |
| 247 | + RusageStime float64 `yaml:"rusage-stime"` |
| 248 | + RusageUtime float64 `yaml:"rusage-utime"` |
| 249 | + TotalConnections int `yaml:"total-connections"` |
| 250 | + TotalJobs int `yaml:"total-jobs"` |
| 251 | + Uptime int `yaml:"uptime"` |
| 252 | + Version string `yaml:"version"` |
| 253 | +} |
| 254 | + |
| 255 | +type statsTubeResponse struct { |
| 256 | + CmdDelete int `yaml:"cmd-delete"` |
| 257 | + CmdPauseTube int `yaml:"cmd-pause-tube"` |
| 258 | + CurrentJobsBuried int `yaml:"current-jobs-buried"` |
| 259 | + CurrentJobsDelayed int `yaml:"current-jobs-delayed"` |
| 260 | + CurrentJobsReady int `yaml:"current-jobs-ready"` |
| 261 | + CurrentJobsReserved int `yaml:"current-jobs-reserved"` |
| 262 | + CurrentJobsUrgent int `yaml:"current-jobs-urgent"` |
| 263 | + CurrentUsing int `yaml:"current-using"` |
| 264 | + CurrentWaiting int `yaml:"current-waiting"` |
| 265 | + CurrentWatching int `yaml:"current-watching"` |
| 266 | + Name string `yaml:"name"` |
| 267 | + Pause int `yaml:"pause"` |
| 268 | + PauseTimeLeft int `yaml:"pause-time-left"` |
| 269 | + TotalJobs int `yaml:"total-jobs"` |
| 270 | +} |
0 commit comments