Skip to content

Commit 84bf0ae

Browse files
authored
Merge pull request #3 from hogklint/feat/retry-failed-tails
Resume logs on failure and restart
2 parents ce90bc7 + 75c2235 commit 84bf0ae

7 files changed

+231
-171
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
github.com/spf13/cobra v1.8.1
1515
github.com/spf13/pflag v1.0.5
1616
golang.org/x/sync v0.8.0
17+
golang.org/x/time v0.7.0
1718
gopkg.in/yaml.v3 v3.0.1
1819
k8s.io/klog/v2 v2.130.1
1920
)
@@ -48,7 +49,6 @@ require (
4849
go.opentelemetry.io/otel/trace v1.27.0 // indirect
4950
golang.org/x/net v0.30.0 // indirect
5051
golang.org/x/sys v0.26.0 // indirect
51-
golang.org/x/time v0.7.0 // indirect
5252
google.golang.org/protobuf v1.34.2 // indirect
5353
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
5454
gotest.tools/v3 v3.5.1 // indirect

stern/docker_stern.go

+51-27
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010

1111
dockerclient "github.com/docker/docker/client"
1212
"golang.org/x/sync/errgroup"
13-
"k8s.io/klog/v2"
13+
"golang.org/x/time/rate"
1414
)
1515

1616
func RunDocker(ctx context.Context, client *dockerclient.Client, config *DockerConfig) error {
@@ -19,7 +19,7 @@ func RunDocker(ctx context.Context, client *dockerclient.Client, config *DockerC
1919
Timestamps: config.Timestamps,
2020
TimestampFormat: config.TimestampFormat,
2121
Location: config.Location,
22-
DockerSinceTime: time.Now().Add(-config.Since),
22+
DockerSinceTime: time.Now().Add(-config.Since).Format(time.RFC3339),
2323
Exclude: config.Exclude,
2424
Include: config.Include,
2525
Highlight: config.Highlight,
@@ -35,9 +35,6 @@ func RunDocker(ctx context.Context, client *dockerclient.Client, config *DockerC
3535
target.Name,
3636
target.ComposeProject,
3737
target.Tty,
38-
target.StartedAt,
39-
target.FinishedAt,
40-
target.SeenPreviously,
4138
config.Template,
4239
config.Out,
4340
config.ErrOut,
@@ -50,26 +47,16 @@ func RunDocker(ctx context.Context, client *dockerclient.Client, config *DockerC
5047
return tail.Start()
5148
}
5249

53-
filter := newDockerTargetFilter(dockerTargetFilterConfig{
54-
containerFilter: config.ContainerQuery,
55-
containerExcludeFilter: config.ExcludeContainerQuery,
56-
composeProjectFilter: config.ComposeProjectQuery,
57-
imageFilter: config.ImageQuery,
58-
},
50+
filter := newDockerTargetFilter(
51+
dockerTargetFilterConfig{
52+
containerFilter: config.ContainerQuery,
53+
containerExcludeFilter: config.ExcludeContainerQuery,
54+
composeProjectFilter: config.ComposeProjectQuery,
55+
imageFilter: config.ImageQuery,
56+
},
5957
max(config.MaxLogRequests*2, 100),
6058
)
6159

62-
tailTarget := func(target *DockerTarget) error {
63-
tail := newTail(target)
64-
defer tail.Close()
65-
err := tail.Start(ctx)
66-
if err != nil && filter.isActive(target) {
67-
fmt.Fprintf(config.ErrOut, "failed to tail %s: %v\n", target.Name, err)
68-
return err
69-
}
70-
return nil
71-
}
72-
7360
if !config.Follow {
7461
containers, err := FilteredContainerGenerator(ctx, config, client, filter)
7562
if err != nil {
@@ -81,7 +68,14 @@ func RunDocker(ctx context.Context, client *dockerclient.Client, config *DockerC
8168
for target := range containers {
8269
target := target
8370
eg.Go(func() error {
84-
return tailTarget(target)
71+
tail := newTail(target)
72+
defer tail.Close()
73+
err := tail.Start(ctx)
74+
if err != nil && filter.isActive(target) {
75+
fmt.Fprintf(config.ErrOut, "failed to tail %s: %v\n", target.Name, err)
76+
return err
77+
}
78+
return nil
8579
})
8680
}
8781
return eg.Wait()
@@ -93,6 +87,39 @@ func RunDocker(ctx context.Context, client *dockerclient.Client, config *DockerC
9387
return err
9488
}
9589

90+
tailTarget := func(target *DockerTarget) {
91+
limiter := rate.NewLimiter(rate.Every(time.Second*20), 2)
92+
resumeRequest := target.ResumeRequest
93+
for {
94+
if err := limiter.Wait(ctx); err != nil {
95+
fmt.Fprintf(config.ErrOut, "failed to retry: %v\n", err)
96+
return
97+
}
98+
tail := newTail(target)
99+
var err error
100+
if resumeRequest == nil {
101+
err = tail.Start(ctx)
102+
} else {
103+
err = tail.Resume(ctx, resumeRequest)
104+
}
105+
tail.Close()
106+
107+
if err == nil {
108+
filter.setResumeRequest(target.Id, tail.GetResumeRequest())
109+
return
110+
}
111+
if !filter.isActive(target) {
112+
filter.setResumeRequest(target.Id, tail.GetResumeRequest())
113+
fmt.Fprintf(config.ErrOut, "failed to tail: %v\n", err)
114+
return
115+
}
116+
fmt.Fprintf(config.ErrOut, "failed to tail: %v, will retry\n", err)
117+
if resumeReq := tail.GetResumeRequest(); resumeReq != nil {
118+
resumeRequest = resumeReq
119+
}
120+
}
121+
}
122+
96123
var numRequests atomic.Int64
97124
for {
98125
target := <-added
@@ -103,10 +130,7 @@ func RunDocker(ctx context.Context, client *dockerclient.Client, config *DockerC
103130
config.MaxLogRequests)
104131
}
105132
go func() {
106-
err := tailTarget(target)
107-
if err != nil {
108-
klog.V(7).ErrorS(err, "Error tailing container", "id", target.Id, "name", target.Name)
109-
}
133+
tailTarget(target)
110134
numRequests.Add(-1)
111135
}()
112136
}

stern/docker_tail.go

+56-51
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ import (
99
"io"
1010
"strings"
1111
"text/template"
12-
"time"
1312

1413
"github.com/docker/docker/api/types/container"
1514
dockerclient "github.com/docker/docker/client"
15+
"github.com/docker/docker/errdefs"
1616
"github.com/fatih/color"
1717
"k8s.io/klog/v2"
1818
)
@@ -23,16 +23,18 @@ type DockerTail struct {
2323
ContainerName string
2424
ComposeProject string
2525
Tty bool
26-
StartedAt time.Time
27-
FinishedAt string
28-
SeenPreviously bool
2926
composeColor *color.Color
3027
containerColor *color.Color
3128
Options *TailOptions
3229
tmpl *template.Template
3330
closed chan struct{}
34-
out io.Writer
35-
errOut io.Writer
31+
last struct {
32+
timestamp string // RFC3339 timestamp (not RFC3339Nano)
33+
lines int // the number of lines seen during this timestamp
34+
}
35+
resumeRequest *ResumeRequest
36+
out io.Writer
37+
errOut io.Writer
3638
}
3739

3840
func NewDockerTail(
@@ -41,9 +43,6 @@ func NewDockerTail(
4143
containerName string,
4244
composeProject string,
4345
tty bool,
44-
startedAt time.Time,
45-
finishedAt string,
46-
seenPreviously bool,
4746
tmpl *template.Template,
4847
out, errOut io.Writer,
4948
options *TailOptions,
@@ -56,9 +55,6 @@ func NewDockerTail(
5655
ContainerName: containerName,
5756
ComposeProject: composeProject,
5857
Tty: tty,
59-
StartedAt: startedAt,
60-
FinishedAt: finishedAt,
61-
SeenPreviously: seenPreviously,
6258
Options: options,
6359
composeColor: composeColor,
6460
containerColor: containerColor,
@@ -86,43 +82,6 @@ func (t *DockerTail) Start(ctx context.Context) error {
8682

8783
t.printStarting()
8884

89-
err := t.consumeRequest(ctx)
90-
if err != nil {
91-
klog.V(7).ErrorS(err, "Error fetching logs for container", "name", t.ContainerName, "id", t.ContainerId)
92-
if errors.Is(err, context.Canceled) {
93-
return nil
94-
}
95-
}
96-
97-
return err
98-
}
99-
100-
func (t *DockerTail) Close() {
101-
t.printStopping()
102-
close(t.closed)
103-
}
104-
105-
// Since log streams end when containers terminate tailfin must keep track of when a container has been previously
106-
// tailed and use a "since"-time from last finish/start. Otherwise it will include logs from previous starts thus
107-
// printing logs already printed.
108-
func (t *DockerTail) getSinceTime() time.Time {
109-
if !t.SeenPreviously ||
110-
t.StartedAt.Before(t.Options.DockerSinceTime) {
111-
return t.Options.DockerSinceTime
112-
}
113-
114-
// If there's no finish time it should mean the container only started once so it's safe to use the options time.
115-
finished, err := time.Parse(time.RFC3339, t.FinishedAt)
116-
if err != nil {
117-
return t.Options.DockerSinceTime
118-
} else if finished.Before(t.StartedAt) {
119-
// Sometimes early logs are missing if StartedAt is used
120-
return finished
121-
}
122-
return t.StartedAt
123-
}
124-
125-
func (t *DockerTail) consumeRequest(ctx context.Context) error {
12685
logs, err := t.client.ContainerLogs(
12786
ctx,
12887
t.ContainerId,
@@ -131,7 +90,7 @@ func (t *DockerTail) consumeRequest(ctx context.Context) error {
13190
ShowStderr: true,
13291
Follow: t.Options.Follow,
13392
Timestamps: true,
134-
Since: t.getSinceTime().Format(time.RFC3339),
93+
Since: t.Options.DockerSinceTime,
13594
Tail: t.Options.DockerTailLines,
13695
},
13796
)
@@ -140,6 +99,30 @@ func (t *DockerTail) consumeRequest(ctx context.Context) error {
14099
}
141100
defer logs.Close()
142101

102+
err = t.consumeStream(ctx, logs)
103+
if err != nil {
104+
klog.V(7).ErrorS(err, "Error fetching logs for container", "name", t.ContainerName, "id", t.ContainerId)
105+
if errors.Is(err, context.Canceled) || errdefs.IsConflict(err) {
106+
return nil
107+
}
108+
}
109+
110+
return err
111+
}
112+
113+
func (t *DockerTail) Close() {
114+
t.printStopping()
115+
close(t.closed)
116+
}
117+
118+
func (t *DockerTail) Resume(ctx context.Context, resumeRequest *ResumeRequest) error {
119+
t.resumeRequest = resumeRequest
120+
t.Options.DockerSinceTime = resumeRequest.Timestamp
121+
t.Options.DockerTailLines = "-1"
122+
return t.Start(ctx)
123+
}
124+
125+
func (t *DockerTail) consumeStream(ctx context.Context, logs io.Reader) error {
143126
r := bufio.NewReader(logs)
144127
for {
145128
line, err := r.ReadBytes('\n')
@@ -162,6 +145,13 @@ func (t *DockerTail) consumeLine(line string) {
162145
return
163146
}
164147

148+
rfc3339 := removeSubsecond(rfc3339Nano)
149+
t.rememberLastTimestamp(rfc3339)
150+
if t.resumeRequest.shouldSkip(rfc3339) {
151+
return
152+
}
153+
t.resumeRequest = nil
154+
165155
if t.Options.IsExclude(content) || !t.Options.IsInclude(content) {
166156
return
167157
}
@@ -236,6 +226,21 @@ func trimLeadingChars(line string, tty bool) string {
236226
klog.V(7).InfoS("Invalid log line format received", "line", line)
237227
return ""
238228
}
239-
240229
return line[8:]
241230
}
231+
232+
func (t *DockerTail) rememberLastTimestamp(timestamp string) {
233+
if t.last.timestamp == timestamp {
234+
t.last.lines++
235+
return
236+
}
237+
t.last.timestamp = timestamp
238+
t.last.lines = 1
239+
}
240+
241+
func (t *DockerTail) GetResumeRequest() *ResumeRequest {
242+
if t.last.timestamp == "" {
243+
return nil
244+
}
245+
return &ResumeRequest{Timestamp: t.last.timestamp, LinesToSkip: t.last.lines}
246+
}

0 commit comments

Comments
 (0)