Skip to content

Commit 16454e2

Browse files
phemmerdanielnelson
authored andcommitted
Fix postfix input handling of multi-level queues (#4333)
1 parent 2a1feb6 commit 16454e2

File tree

3 files changed

+29
-54
lines changed

3 files changed

+29
-54
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
- [#4277](https://github.com/influxdata/telegraf/pull/4277): Treat sigterm as a clean shutdown signal.
3030
- [#4284](https://github.com/influxdata/telegraf/pull/4284): Fix selection of tags under nested objects in the JSON parser.
31+
- [#4135](https://github.com/influxdata/telegraf/issues/4135): Fix postfix input handling multi-level queues.
3132

3233
## v1.7 [2018-06-12]
3334

plugins/inputs/postfix/postfix.go

+19-42
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"fmt"
55
"os"
66
"os/exec"
7-
"path"
7+
"path/filepath"
88
"strings"
99
"time"
1010

@@ -28,36 +28,37 @@ func getQueueDirectory() (string, error) {
2828
return strings.TrimSpace(string(qd)), nil
2929
}
3030

31-
func qScan(path string) (int64, int64, int64, error) {
32-
f, err := os.Open(path)
33-
if err != nil {
34-
return 0, 0, 0, err
35-
}
36-
37-
finfos, err := f.Readdir(-1)
38-
f.Close()
39-
if err != nil {
40-
return 0, 0, 0, err
41-
}
42-
31+
func qScan(path string, acc telegraf.Accumulator) (int64, int64, int64, error) {
4332
var length, size int64
4433
var oldest time.Time
45-
for _, finfo := range finfos {
34+
err := filepath.Walk(path, func(_ string, finfo os.FileInfo, err error) error {
35+
if err != nil {
36+
acc.AddError(fmt.Errorf("error scanning %s: %s", path, err))
37+
return nil
38+
}
39+
if finfo.IsDir() {
40+
return nil
41+
}
42+
4643
length++
4744
size += finfo.Size()
4845

4946
ctime := statCTime(finfo.Sys())
5047
if ctime.IsZero() {
51-
continue
48+
return nil
5249
}
5350
if oldest.IsZero() || ctime.Before(oldest) {
5451
oldest = ctime
5552
}
53+
return nil
54+
})
55+
if err != nil {
56+
return 0, 0, 0, err
5657
}
5758
var age int64
5859
if !oldest.IsZero() {
5960
age = int64(time.Now().Sub(oldest) / time.Second)
60-
} else if len(finfos) != 0 {
61+
} else if length != 0 {
6162
// system doesn't support ctime
6263
age = -1
6364
}
@@ -77,8 +78,8 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
7778
}
7879
}
7980

80-
for _, q := range []string{"active", "hold", "incoming", "maildrop"} {
81-
length, size, age, err := qScan(path.Join(p.QueueDirectory, q))
81+
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
82+
length, size, age, err := qScan(filepath.Join(p.QueueDirectory, q), acc)
8283
if err != nil {
8384
acc.AddError(fmt.Errorf("error scanning queue %s: %s", q, err))
8485
continue
@@ -90,30 +91,6 @@ func (p *Postfix) Gather(acc telegraf.Accumulator) error {
9091
acc.AddFields("postfix_queue", fields, map[string]string{"queue": q})
9192
}
9293

93-
var dLength, dSize int64
94-
dAge := int64(-1)
95-
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F"} {
96-
length, size, age, err := qScan(path.Join(p.QueueDirectory, "deferred", q))
97-
if err != nil {
98-
if os.IsNotExist(err) {
99-
// the directories are created on first use
100-
continue
101-
}
102-
acc.AddError(fmt.Errorf("error scanning queue deferred/%s: %s", q, err))
103-
return nil
104-
}
105-
dLength += length
106-
dSize += size
107-
if age > dAge {
108-
dAge = age
109-
}
110-
}
111-
fields := map[string]interface{}{"length": dLength, "size": dSize}
112-
if dAge != -1 {
113-
fields["age"] = dAge
114-
}
115-
acc.AddFields("postfix_queue", fields, map[string]string{"queue": "deferred"})
116-
11794
return nil
11895
}
11996

plugins/inputs/postfix/postfix_test.go

+9-12
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package postfix
33
import (
44
"io/ioutil"
55
"os"
6-
"path"
6+
"path/filepath"
77
"testing"
88

99
"github.com/influxdata/telegraf/testutil"
@@ -16,19 +16,16 @@ func TestGather(t *testing.T) {
1616
require.NoError(t, err)
1717
defer os.RemoveAll(td)
1818

19-
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred"} {
20-
require.NoError(t, os.Mkdir(path.Join(td, q), 0755))
21-
}
22-
for _, q := range []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "F"} { // "E" deliberately left off
23-
require.NoError(t, os.Mkdir(path.Join(td, "deferred", q), 0755))
19+
for _, q := range []string{"active", "hold", "incoming", "maildrop", "deferred/0/0", "deferred/F/F"} {
20+
require.NoError(t, os.MkdirAll(filepath.FromSlash(td+"/"+q), 0755))
2421
}
2522

26-
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "01"), []byte("abc"), 0644))
27-
require.NoError(t, ioutil.WriteFile(path.Join(td, "active", "02"), []byte("defg"), 0644))
28-
require.NoError(t, ioutil.WriteFile(path.Join(td, "hold", "01"), []byte("abc"), 0644))
29-
require.NoError(t, ioutil.WriteFile(path.Join(td, "incoming", "01"), []byte("abcd"), 0644))
30-
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "0", "01"), []byte("abc"), 0644))
31-
require.NoError(t, ioutil.WriteFile(path.Join(td, "deferred", "F", "F1"), []byte("abc"), 0644))
23+
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/01"), []byte("abc"), 0644))
24+
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/active/02"), []byte("defg"), 0644))
25+
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/hold/01"), []byte("abc"), 0644))
26+
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/incoming/01"), []byte("abcd"), 0644))
27+
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/0/0/01"), []byte("abc"), 0644))
28+
require.NoError(t, ioutil.WriteFile(filepath.FromSlash(td+"/deferred/F/F/F1"), []byte("abc"), 0644))
3229

3330
p := Postfix{
3431
QueueDirectory: td,

0 commit comments

Comments
 (0)