Skip to content

Commit 7bf5d66

Browse files
authored
[pkg/stanza] Add an option to resend logs instead of dropping (#20864)
Add a `retry_on_failure` config option (disabled by default) to stanza receivers that can be used to slow down reading logs instead of dropping if downstream components return a non-permanent error. The configuration has the following options: - `enabled`: Enable or disable the retry mechanism. Default is `false`. - `initial_interval`: The initial interval to wait before retrying. Default is `1s`. - `max_interval`: The maximum interval to wait before retrying. Default is `30s`. - `max_elapsed_time`: The maximum amount of time to wait before giving up. Default is `5m`. The configuration interface is inspired by https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/exporterhelper#configuration which potentially can be exposed in as another package not specific to exporter and used by any components
1 parent 5616ef8 commit 7bf5d66

File tree

43 files changed

+586
-134
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+586
-134
lines changed

.chloggen/filelog-receiver-retry.yaml

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
5+
component: pkg/stanza
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Add and option to pause reading a file and attempt to resend the current batch of logs if it encounters an error from downstream components.
9+
10+
# One or more tracking issues related to the change
11+
issues: [20511]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext: |
17+
Add a `retry_on_failure` config option (disabled by default) that can be used to slow down reading logs instead of
18+
dropping logs if downstream components return a non-permanent error. The configuration has the following options:
19+
- `enabled`: Enable or disable the retry mechanism. Default is `false`.
20+
- `initial_interval`: The initial interval to wait before retrying. Default is `1s`.
21+
- `max_interval`: The maximum interval to wait before retrying. Default is `30s`.
22+
- `max_elapsed_time`: The maximum amount of time to wait before giving up. Default is `5m`.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package consumerretry // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry"
16+
17+
import "time"
18+
19+
// Config defines configuration for retrying batches in case of receiving a retryable error from a downstream
20+
// consumer. If the retryable error doesn't provide a delay, exponential backoff is applied.
21+
type Config struct {
22+
// Enabled indicates whether to not retry sending logs in case of receiving a retryable error from a downstream
23+
// consumer. Default is false.
24+
Enabled bool `mapstructure:"enabled"`
25+
// InitialInterval the time to wait after the first failure before retrying. Default value is 1 second.
26+
InitialInterval time.Duration `mapstructure:"initial_interval"`
27+
// MaxInterval is the upper bound on backoff interval. Once this value is reached the delay between
28+
// consecutive retries will always be `MaxInterval`. Default value is 30 seconds.
29+
MaxInterval time.Duration `mapstructure:"max_interval"`
30+
// MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a logs batch to
31+
// a downstream consumer. Once this value is reached, the data is discarded. It never stops if MaxElapsedTime == 0.
32+
// Default value is 5 minutes.
33+
MaxElapsedTime time.Duration `mapstructure:"max_elapsed_time"`
34+
}
35+
36+
// NewDefaultConfig returns the default Config.
37+
func NewDefaultConfig() Config {
38+
return Config{
39+
Enabled: false,
40+
InitialInterval: 1 * time.Second,
41+
MaxInterval: 30 * time.Second,
42+
MaxElapsedTime: 5 * time.Minute,
43+
}
44+
}
+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package consumerretry // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry"
16+
17+
import (
18+
"context"
19+
"errors"
20+
"fmt"
21+
"time"
22+
23+
"github.com/cenkalti/backoff/v4"
24+
"go.opentelemetry.io/collector/consumer"
25+
"go.opentelemetry.io/collector/consumer/consumererror"
26+
"go.opentelemetry.io/collector/pdata/plog"
27+
"go.opentelemetry.io/otel/attribute"
28+
"go.opentelemetry.io/otel/trace"
29+
"go.uber.org/zap"
30+
)
31+
32+
type logsConsumer struct {
33+
consumer.Logs
34+
cfg Config
35+
logger *zap.Logger
36+
}
37+
38+
func NewLogs(config Config, logger *zap.Logger, next consumer.Logs) consumer.Logs {
39+
return &logsConsumer{
40+
Logs: next,
41+
cfg: config,
42+
logger: logger,
43+
}
44+
}
45+
46+
func (lc *logsConsumer) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
47+
if !lc.cfg.Enabled {
48+
err := lc.Logs.ConsumeLogs(ctx, logs)
49+
if err != nil {
50+
lc.logger.Error("ConsumeLogs() failed. "+
51+
"Enable retry_on_failure to slow down reading logs and avoid dropping.", zap.Error(err))
52+
}
53+
return err
54+
}
55+
56+
// Do not use NewExponentialBackOff since it calls Reset and the code here must
57+
// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
58+
expBackoff := backoff.ExponentialBackOff{
59+
MaxElapsedTime: lc.cfg.MaxElapsedTime,
60+
InitialInterval: lc.cfg.InitialInterval,
61+
MaxInterval: lc.cfg.MaxInterval,
62+
RandomizationFactor: backoff.DefaultRandomizationFactor,
63+
Multiplier: backoff.DefaultMultiplier,
64+
Stop: backoff.Stop,
65+
Clock: backoff.SystemClock,
66+
}
67+
expBackoff.Reset()
68+
69+
span := trace.SpanFromContext(ctx)
70+
retryNum := int64(0)
71+
retryableErr := consumererror.Logs{}
72+
for {
73+
span.AddEvent(
74+
"Sending logs.",
75+
trace.WithAttributes(attribute.Int64("retry_num", retryNum)))
76+
77+
err := lc.Logs.ConsumeLogs(ctx, logs)
78+
if err == nil {
79+
return nil
80+
}
81+
82+
if consumererror.IsPermanent(err) {
83+
lc.logger.Error(
84+
"ConsumeLogs() failed. The error is not retryable. Dropping data.",
85+
zap.Error(err),
86+
zap.Int("dropped_items", logs.LogRecordCount()),
87+
)
88+
return err
89+
}
90+
91+
if errors.As(err, &retryableErr) {
92+
logs = retryableErr.Data()
93+
}
94+
95+
// TODO: take delay from the error once it is available in the consumererror package.
96+
backoffDelay := expBackoff.NextBackOff()
97+
if backoffDelay == backoff.Stop {
98+
lc.logger.Error("Max elapsed time expired. Dropping data.", zap.Error(err), zap.Int("dropped_items",
99+
logs.LogRecordCount()))
100+
return err
101+
}
102+
103+
backoffDelayStr := backoffDelay.String()
104+
span.AddEvent(
105+
"ConsumeLogs() failed. Will retry the request after interval.",
106+
trace.WithAttributes(
107+
attribute.String("interval", backoffDelayStr),
108+
attribute.String("error", err.Error())))
109+
lc.logger.Debug(
110+
"ConsumeLogs() failed. Will retry the request after interval.",
111+
zap.Error(err),
112+
zap.String("interval", backoffDelayStr),
113+
zap.Int("logs_count", logs.LogRecordCount()),
114+
)
115+
retryNum++
116+
117+
// back-off, but get interrupted when shutting down or request is cancelled or timed out.
118+
select {
119+
case <-ctx.Done():
120+
return fmt.Errorf("context is cancelled or timed out %w", err)
121+
case <-time.After(backoffDelay):
122+
}
123+
}
124+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package consumerretry
16+
17+
import (
18+
"context"
19+
"errors"
20+
"testing"
21+
"time"
22+
23+
"github.com/stretchr/testify/assert"
24+
"go.opentelemetry.io/collector/consumer/consumererror"
25+
"go.uber.org/zap"
26+
27+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
28+
)
29+
30+
func TestConsumeLogs(t *testing.T) {
31+
tests := []struct {
32+
name string
33+
cfg Config
34+
consumer *MockLogsRejecter
35+
expectedErr error
36+
}{
37+
{
38+
name: "no_retry_success",
39+
expectedErr: nil,
40+
cfg: NewDefaultConfig(),
41+
consumer: NewMockLogsRejecter(0),
42+
},
43+
{
44+
name: "permanent_error",
45+
expectedErr: consumererror.NewPermanent(errors.New("permanent error")),
46+
cfg: Config{Enabled: true},
47+
consumer: NewMockLogsRejecter(-1),
48+
},
49+
{
50+
name: "timeout_error",
51+
expectedErr: errors.New("retry later"),
52+
cfg: Config{
53+
Enabled: true,
54+
InitialInterval: 1 * time.Millisecond,
55+
MaxInterval: 5 * time.Millisecond,
56+
MaxElapsedTime: 10 * time.Millisecond,
57+
},
58+
consumer: NewMockLogsRejecter(20),
59+
},
60+
{
61+
name: "retry_success",
62+
expectedErr: nil,
63+
cfg: Config{
64+
Enabled: true,
65+
InitialInterval: 1 * time.Millisecond,
66+
MaxInterval: 2 * time.Millisecond,
67+
MaxElapsedTime: 100 * time.Millisecond,
68+
},
69+
consumer: NewMockLogsRejecter(5),
70+
},
71+
}
72+
73+
for _, tt := range tests {
74+
t.Run(tt.name, func(t *testing.T) {
75+
consumer := NewLogs(tt.cfg, zap.NewNop(), tt.consumer)
76+
err := consumer.ConsumeLogs(context.Background(), testdata.GenerateLogsTwoLogRecordsSameResource())
77+
assert.Equal(t, tt.expectedErr, err)
78+
if err == nil {
79+
assert.Equal(t, 1, len(tt.consumer.AllLogs()))
80+
assert.Equal(t, 2, tt.consumer.AllLogs()[0].LogRecordCount())
81+
if tt.consumer.acceptAfter > 0 {
82+
assert.Equal(t, tt.consumer.rejectCount.Load(), tt.consumer.acceptAfter)
83+
}
84+
} else if tt.consumer.acceptAfter > 0 {
85+
assert.Less(t, tt.consumer.rejectCount.Load(), tt.consumer.acceptAfter)
86+
}
87+
})
88+
}
89+
}
90+
91+
func TestConsumeLogs_ContextDeadline(t *testing.T) {
92+
consumer := NewLogs(Config{
93+
Enabled: true,
94+
InitialInterval: 1 * time.Millisecond,
95+
MaxInterval: 5 * time.Millisecond,
96+
MaxElapsedTime: 50 * time.Millisecond,
97+
}, zap.NewNop(), NewMockLogsRejecter(10))
98+
99+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
100+
defer cancel()
101+
err := consumer.ConsumeLogs(ctx, testdata.GenerateLogsTwoLogRecordsSameResource())
102+
assert.Error(t, err)
103+
assert.Contains(t, err.Error(), "context is cancelled or timed out retry later")
104+
}
105+
106+
func TestConsumeLogs_PartialRetry(t *testing.T) {
107+
sink := &mockPartialLogsRejecter{}
108+
consumer := NewLogs(Config{
109+
Enabled: true,
110+
InitialInterval: 1 * time.Millisecond,
111+
MaxInterval: 5 * time.Millisecond,
112+
MaxElapsedTime: 50 * time.Millisecond,
113+
}, zap.NewNop(), sink)
114+
115+
logs := testdata.GenerateLogsTwoLogRecordsSameResource()
116+
testdata.GenerateLogsOneLogRecordNoResource().ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
117+
assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs))
118+
119+
// Verify the logs batch is broken into two parts, one with the partial error and one without.
120+
assert.Equal(t, 2, len(sink.AllLogs()))
121+
assert.Equal(t, 1, sink.AllLogs()[0].ResourceLogs().Len())
122+
assert.Equal(t, 2, sink.AllLogs()[0].LogRecordCount())
123+
assert.Equal(t, 1, sink.AllLogs()[1].ResourceLogs().Len())
124+
assert.Equal(t, 1, sink.AllLogs()[1].LogRecordCount())
125+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package consumerretry // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/consumerretry"
16+
17+
import (
18+
"context"
19+
"errors"
20+
"sync/atomic"
21+
22+
"go.opentelemetry.io/collector/consumer/consumererror"
23+
"go.opentelemetry.io/collector/consumer/consumertest"
24+
"go.opentelemetry.io/collector/pdata/plog"
25+
)
26+
27+
type MockLogsRejecter struct {
28+
consumertest.LogsSink
29+
rejectCount *atomic.Int32
30+
acceptAfter int32
31+
}
32+
33+
// NewMockLogsRejecter creates new MockLogsRejecter. acceptAfter is a number of rejects before accepting,
34+
// 0 means always accept, -1 means always reject with permanent error
35+
func NewMockLogsRejecter(acceptAfter int32) *MockLogsRejecter {
36+
return &MockLogsRejecter{
37+
acceptAfter: acceptAfter,
38+
rejectCount: &atomic.Int32{},
39+
}
40+
}
41+
42+
func (m *MockLogsRejecter) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
43+
if m.acceptAfter < 0 {
44+
return consumererror.NewPermanent(errors.New("permanent error"))
45+
}
46+
if m.rejectCount.Load() < m.acceptAfter {
47+
m.rejectCount.Add(1)
48+
return errors.New("retry later")
49+
}
50+
return m.LogsSink.ConsumeLogs(ctx, logs)
51+
}
52+
53+
// mockPartialLogsRejecter is a mock LogsConsumer that accepts only one logs object and rejects the rest.
54+
type mockPartialLogsRejecter struct {
55+
consumertest.LogsSink
56+
}
57+
58+
func (m *mockPartialLogsRejecter) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
59+
if logs.ResourceLogs().Len() <= 1 {
60+
return m.LogsSink.ConsumeLogs(ctx, logs)
61+
}
62+
accepted := plog.NewLogs()
63+
rejected := plog.NewLogs()
64+
logs.ResourceLogs().At(0).CopyTo(accepted.ResourceLogs().AppendEmpty())
65+
for i := 1; i < logs.ResourceLogs().Len(); i++ {
66+
logs.ResourceLogs().At(i).CopyTo(rejected.ResourceLogs().AppendEmpty())
67+
}
68+
_ = m.LogsSink.ConsumeLogs(ctx, accepted)
69+
return consumererror.NewLogs(errors.New("partial error"), rejected)
70+
}

0 commit comments

Comments
 (0)