Skip to content

Commit b789acf

Browse files
committed
Cross platform support for the 'processes' plugin
closes #798
1 parent be21fef commit b789acf

File tree

7 files changed

+381
-35
lines changed

7 files changed

+381
-35
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
- [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions.
1515
- [#799](https://github.com/influxdata/telegraf/pull/799): Add number of threads for procstat input plugin. Thanks @titilambert!
1616
- [#776](https://github.com/influxdata/telegraf/pull/776): Add Zookeeper chroot option to kafka_consumer. Thanks @prune998!
17+
- [#811](https://github.com/influxdata/telegraf/pull/811): Add processes plugin for classifying total procs on system. Thanks @titilambert!
1718

1819
### Bugfixes
1920
- [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":"
@@ -24,6 +25,7 @@
2425
- [#773](https://github.com/influxdata/telegraf/issues/773): Fix duplicate measurements in snmp plugin. Thanks @titilambert!
2526
- [#708](https://github.com/influxdata/telegraf/issues/708): packaging: build ARM package
2627
- [#713](https://github.com/influxdata/telegraf/issues/713): packaging: insecure permissions error on log directory
28+
- [#816](https://github.com/influxdata/telegraf/issues/816): Fix phpfpm panic if fcgi endpoint unreachable.
2729

2830
## v0.10.4.1
2931

README.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,13 @@ Currently implemented sources:
214214
* disk
215215
* diskio
216216
* swap
217+
* processes
217218

218219
Telegraf can also collect metrics via the following service plugins:
219220

220221
* statsd
221-
* udp listener
222+
* udp_listener
223+
* tcp_listener
222224
* mqtt_consumer
223225
* kafka_consumer
224226
* nats_consumer

etc/telegraf.conf

+4
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@
123123
[[inputs.mem]]
124124
# no configuration
125125

126+
# Get the number of processes and group them by status
127+
[[inputs.processes]]
128+
# no configuration
129+
126130
# Read metrics about swap memory usage
127131
[[inputs.swap]]
128132
# no configuration
+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Processes Input Plugin
2+
3+
This plugin gathers info about the total number of processes and groups
4+
them by status (zombie, sleeping, running, etc.)
5+
6+
On linux this plugin requires access to procfs (/proc), on other OSes
7+
it requires access to execute `ps`.
8+
9+
### Configuration:
10+
11+
```toml
12+
# Get the number of processes and group them by status
13+
[[inputs.processes]]
14+
# no configuration
15+
```
16+
17+
### Measurements & Fields:
18+
19+
- processes
20+
- blocked (aka disk sleep or uninterruptible sleep)
21+
- running
22+
- sleeping
23+
- stopped
24+
- total
25+
- zombie
26+
- wait (freebsd only)
27+
- idle (bsd only)
28+
- paging (linux only)
29+
- total_threads (linux only)
30+
31+
### Process State Mappings
32+
33+
Different OSes use slightly different State codes for their processes, these
34+
state codes are documented in `man ps`, and I will give a mapping of what major
35+
OS state codes correspond to in telegraf metrics:
36+
37+
```
38+
Linux FreeBSD Darwin meaning
39+
R R R running
40+
S S S sleeping
41+
Z Z Z zombie
42+
T T T stopped
43+
none I I idle (sleeping for longer than about 20 seconds)
44+
D D,L U blocked (waiting in uninterruptible sleep, or locked)
45+
W W none paging (linux kernel < 2.6 only), wait (freebsd)
46+
```
47+
48+
### Tags:
49+
50+
None
51+
52+
### Example Output:
53+
54+
```
55+
$ telegraf -config ~/ws/telegraf.conf -input-filter processes -test
56+
* Plugin: processes, Collection 1
57+
> processes blocked=8i,running=1i,sleeping=265i,stopped=0i,total=274i,zombie=0i,paging=0i,total_threads=687i 1457478636980905042
58+
```

plugins/inputs/system/processes.go

+184-29
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,216 @@
1+
// +build !windows
2+
13
package system
24

35
import (
6+
"bytes"
47
"fmt"
8+
"io/ioutil"
59
"log"
10+
"os"
11+
"os/exec"
12+
"path"
13+
"runtime"
14+
"strconv"
615

716
"github.com/influxdata/telegraf"
817
"github.com/influxdata/telegraf/plugins/inputs"
9-
"github.com/shirou/gopsutil/process"
1018
)
1119

1220
type Processes struct {
21+
execPS func() ([]byte, error)
22+
readProcFile func(statFile string) ([]byte, error)
23+
24+
forcePS bool
25+
forceProc bool
1326
}
1427

15-
func (_ *Processes) Description() string {
16-
return "Get the number of processes and group them by status (Linux only)"
28+
func (p *Processes) Description() string {
29+
return "Get the number of processes and group them by status"
1730
}
1831

19-
func (_ *Processes) SampleConfig() string { return "" }
32+
func (p *Processes) SampleConfig() string { return "" }
2033

21-
func (s *Processes) Gather(acc telegraf.Accumulator) error {
22-
pids, err := process.Pids()
23-
if err != nil {
24-
return fmt.Errorf("error getting pids list: %s", err)
34+
func (p *Processes) Gather(acc telegraf.Accumulator) error {
35+
// Get an empty map of metric fields
36+
fields := getEmptyFields()
37+
38+
// Decide if we will use 'ps' to get stats (use procfs otherwise)
39+
usePS := true
40+
if runtime.GOOS == "linux" {
41+
usePS = false
2542
}
26-
// TODO handle other OS (Windows/BSD/Solaris/OSX)
43+
if p.forcePS {
44+
usePS = true
45+
} else if p.forceProc {
46+
usePS = false
47+
}
48+
49+
// Gather stats from 'ps' or procfs
50+
if usePS {
51+
if err := p.gatherFromPS(fields); err != nil {
52+
return err
53+
}
54+
} else {
55+
if err := p.gatherFromProc(fields); err != nil {
56+
return err
57+
}
58+
}
59+
60+
acc.AddFields("processes", fields, nil)
61+
return nil
62+
}
63+
64+
// Gets empty fields of metrics based on the OS
65+
func getEmptyFields() map[string]interface{} {
2766
fields := map[string]interface{}{
28-
"paging": uint64(0),
29-
"blocked": uint64(0),
30-
"zombie": uint64(0),
31-
"stopped": uint64(0),
32-
"running": uint64(0),
33-
"sleeping": uint64(0),
34-
}
35-
for _, pid := range pids {
36-
process, err := process.NewProcess(pid)
37-
if err != nil {
38-
log.Printf("Can not get process %d status: %s", pid, err)
67+
"blocked": int64(0),
68+
"zombie": int64(0),
69+
"stopped": int64(0),
70+
"running": int64(0),
71+
"sleeping": int64(0),
72+
"total": int64(0),
73+
}
74+
switch runtime.GOOS {
75+
case "freebsd":
76+
fields["idle"] = int64(0)
77+
fields["wait"] = int64(0)
78+
case "darwin":
79+
fields["idle"] = int64(0)
80+
case "openbsd":
81+
fields["idle"] = int64(0)
82+
case "linux":
83+
fields["paging"] = int64(0)
84+
fields["total_threads"] = int64(0)
85+
}
86+
return fields
87+
}
88+
89+
// exec `ps` to get all process states
90+
func (p *Processes) gatherFromPS(fields map[string]interface{}) error {
91+
out, err := p.execPS()
92+
if err != nil {
93+
return err
94+
}
95+
96+
for i, status := range bytes.Fields(out) {
97+
if i == 0 && string(status) == "STAT" {
98+
// This is a header, skip it
99+
continue
100+
}
101+
switch status[0] {
102+
case 'W':
103+
fields["wait"] = fields["wait"].(int64) + int64(1)
104+
case 'U', 'D', 'L':
105+
// Also known as uninterruptible sleep or disk sleep
106+
fields["blocked"] = fields["blocked"].(int64) + int64(1)
107+
case 'Z':
108+
fields["zombie"] = fields["zombie"].(int64) + int64(1)
109+
case 'T':
110+
fields["stopped"] = fields["stopped"].(int64) + int64(1)
111+
case 'R':
112+
fields["running"] = fields["running"].(int64) + int64(1)
113+
case 'S':
114+
fields["sleeping"] = fields["sleeping"].(int64) + int64(1)
115+
case 'I':
116+
fields["idle"] = fields["idle"].(int64) + int64(1)
117+
default:
118+
log.Printf("processes: Unknown state [ %s ] from ps",
119+
string(status[0]))
120+
}
121+
fields["total"] = fields["total"].(int64) + int64(1)
122+
}
123+
return nil
124+
}
125+
126+
// get process states from /proc/(pid)/stat files
127+
func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
128+
files, err := ioutil.ReadDir("/proc")
129+
if err != nil {
130+
return err
131+
}
132+
133+
for _, file := range files {
134+
if !file.IsDir() {
39135
continue
40136
}
41-
status, err := process.Status()
137+
138+
statFile := path.Join("/proc", file.Name(), "stat")
139+
data, err := p.readProcFile(statFile)
42140
if err != nil {
43-
log.Printf("Can not get process %d status: %s\n", pid, err)
141+
return err
142+
}
143+
if data == nil {
44144
continue
45145
}
46-
_, exists := fields[status]
47-
if !exists {
48-
log.Printf("Status '%s' for process with pid: %d\n", status, pid)
146+
147+
stats := bytes.Fields(data)
148+
if len(stats) < 3 {
149+
return fmt.Errorf("Something is terribly wrong with %s", statFile)
150+
}
151+
switch stats[2][0] {
152+
case 'R':
153+
fields["running"] = fields["running"].(int64) + int64(1)
154+
case 'S':
155+
fields["sleeping"] = fields["sleeping"].(int64) + int64(1)
156+
case 'D':
157+
fields["blocked"] = fields["blocked"].(int64) + int64(1)
158+
case 'Z':
159+
fields["zombies"] = fields["zombies"].(int64) + int64(1)
160+
case 'T', 't':
161+
fields["stopped"] = fields["stopped"].(int64) + int64(1)
162+
case 'W':
163+
fields["paging"] = fields["paging"].(int64) + int64(1)
164+
default:
165+
log.Printf("processes: Unknown state [ %s ] in file %s",
166+
string(stats[2][0]), statFile)
167+
}
168+
fields["total"] = fields["total"].(int64) + int64(1)
169+
170+
threads, err := strconv.Atoi(string(stats[19]))
171+
if err != nil {
172+
log.Printf("processes: Error parsing thread count: %s", err)
49173
continue
50174
}
51-
fields[status] = fields[status].(uint64) + uint64(1)
175+
fields["total_threads"] = fields["total_threads"].(int64) + int64(threads)
52176
}
53-
54-
acc.AddFields("processes", fields, nil)
55177
return nil
56178
}
179+
180+
func readProcFile(statFile string) ([]byte, error) {
181+
if _, err := os.Stat(statFile); os.IsNotExist(err) {
182+
return nil, nil
183+
} else if err != nil {
184+
return nil, err
185+
}
186+
187+
data, err := ioutil.ReadFile(statFile)
188+
if err != nil {
189+
return nil, err
190+
}
191+
192+
return data, nil
193+
}
194+
195+
func execPS() ([]byte, error) {
196+
bin, err := exec.LookPath("ps")
197+
if err != nil {
198+
return nil, err
199+
}
200+
201+
out, err := exec.Command(bin, "axo", "state").Output()
202+
if err != nil {
203+
return nil, err
204+
}
205+
206+
return out, err
207+
}
208+
57209
func init() {
58210
inputs.Add("processes", func() telegraf.Input {
59-
return &Processes{}
211+
return &Processes{
212+
execPS: execPS,
213+
readProcFile: readProcFile,
214+
}
60215
})
61216
}

0 commit comments

Comments
 (0)