Skip to content

Commit d214a58

Browse files
authored
Use docker log timestamp as metric time (influxdata#7434)
1 parent de6df67 commit d214a58

File tree

2 files changed

+54
-21
lines changed

2 files changed

+54
-21
lines changed

plugins/inputs/docker_log/docker_log.go

+39-16
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package docker_log
22

33
import (
44
"bufio"
5+
"bytes"
56
"context"
67
"crypto/tls"
8+
"fmt"
79
"io"
810
"strings"
911
"sync"
@@ -287,7 +289,7 @@ func (d *DockerLogs) tailContainerLogs(
287289
logOptions := types.ContainerLogsOptions{
288290
ShowStdout: true,
289291
ShowStderr: true,
290-
Timestamps: false,
292+
Timestamps: true,
291293
Details: false,
292294
Follow: true,
293295
Tail: tail,
@@ -311,6 +313,30 @@ func (d *DockerLogs) tailContainerLogs(
311313
}
312314
}
313315

316+
func parseLine(line []byte) (time.Time, string, error) {
317+
parts := bytes.SplitN(line, []byte(" "), 2)
318+
319+
switch len(parts) {
320+
case 1:
321+
parts = append(parts, []byte(""))
322+
}
323+
324+
tsString := string(parts[0])
325+
326+
// Keep any leading space, but remove whitespace from end of line.
327+
// This preserves space in, for example, stacktraces, while removing
328+
// annoying end of line characters and is similar to how other logging
329+
// plugins such as syslog behave.
330+
message := bytes.TrimRightFunc(parts[1], unicode.IsSpace)
331+
332+
ts, err := time.Parse(time.RFC3339Nano, tsString)
333+
if err != nil {
334+
return time.Time{}, "", fmt.Errorf("error parsing timestamp %q: %v", tsString, err)
335+
}
336+
337+
return ts, string(message), nil
338+
}
339+
314340
func tailStream(
315341
acc telegraf.Accumulator,
316342
baseTags map[string]string,
@@ -328,22 +354,19 @@ func tailStream(
328354

329355
r := bufio.NewReaderSize(reader, 64*1024)
330356

331-
var err error
332-
var message string
333357
for {
334-
message, err = r.ReadString('\n')
335-
336-
// Keep any leading space, but remove whitespace from end of line.
337-
// This preserves space in, for example, stacktraces, while removing
338-
// annoying end of line characters and is similar to how other logging
339-
// plugins such as syslog behave.
340-
message = strings.TrimRightFunc(message, unicode.IsSpace)
341-
342-
if len(message) != 0 {
343-
acc.AddFields("docker_log", map[string]interface{}{
344-
"container_id": containerID,
345-
"message": message,
346-
}, tags)
358+
line, err := r.ReadBytes('\n')
359+
360+
if len(line) != 0 {
361+
ts, message, err := parseLine(line)
362+
if err != nil {
363+
acc.AddError(err)
364+
} else {
365+
acc.AddFields("docker_log", map[string]interface{}{
366+
"container_id": containerID,
367+
"message": message,
368+
}, tags, ts)
369+
}
347370
}
348371

349372
if err != nil {

plugins/inputs/docker_log/docker_log_test.go

+15-5
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ func (r *Response) Close() error {
5353
return nil
5454
}
5555

56+
func MustParse(layout, value string) time.Time {
57+
tm, err := time.Parse(layout, value)
58+
if err != nil {
59+
panic(err)
60+
}
61+
return tm
62+
}
63+
5664
func Test(t *testing.T) {
5765
tests := []struct {
5866
name string
@@ -87,7 +95,7 @@ func Test(t *testing.T) {
8795
}, nil
8896
},
8997
ContainerLogsF: func(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
90-
return &Response{Reader: bytes.NewBuffer([]byte("hello\n"))}, nil
98+
return &Response{Reader: bytes.NewBuffer([]byte("2020-04-28T18:43:16.432691200Z hello\n"))}, nil
9199
},
92100
},
93101
expected: []telegraf.Metric{
@@ -104,7 +112,7 @@ func Test(t *testing.T) {
104112
"container_id": "deadbeef",
105113
"message": "hello",
106114
},
107-
time.Now(),
115+
MustParse(time.RFC3339Nano, "2020-04-28T18:43:16.432691200Z"),
108116
),
109117
},
110118
},
@@ -130,7 +138,7 @@ func Test(t *testing.T) {
130138
ContainerLogsF: func(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
131139
var buf bytes.Buffer
132140
w := stdcopy.NewStdWriter(&buf, stdcopy.Stdout)
133-
w.Write([]byte("hello from stdout"))
141+
w.Write([]byte("2020-04-28T18:42:16.432691200Z hello from stdout"))
134142
return &Response{Reader: &buf}, nil
135143
},
136144
},
@@ -148,7 +156,7 @@ func Test(t *testing.T) {
148156
"container_id": "deadbeef",
149157
"message": "hello from stdout",
150158
},
151-
time.Now(),
159+
MustParse(time.RFC3339Nano, "2020-04-28T18:42:16.432691200Z"),
152160
),
153161
},
154162
},
@@ -172,7 +180,9 @@ func Test(t *testing.T) {
172180
acc.Wait(len(tt.expected))
173181
plugin.Stop()
174182

175-
testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
183+
require.Nil(t, acc.Errors) // no errors during gathering
184+
185+
testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics())
176186
})
177187
}
178188
}

0 commit comments

Comments
 (0)