Skip to content

Commit f4c41b1

Browse files
namco1992djaglowski
authored andcommitted
[chore][pkg/stanza] Fix the bug that the log emitter might hang when the receiver retry indefinitely (open-telemetry#37159)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description I was exploring options for backpressure the pipeline when the exporter fails. Inspired by open-telemetry#29410 (comment), I realized that I could enable the `retry_on_failure` on the receiver side, and have it retry indefinitely by setting `max_elapsed_time` to 0. ```yaml receivers: filelog: include: [ input.log ] retry_on_failure: enabled: true max_elapsed_time: 0 ``` With this config, the consumer will be blocked at the `ConsumeLogs` func in `consumerretry` when the exporter fails to consume the logs: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/12551d324375bd0c4647a8cdc7bd0f8c435c1034/internal/coreinternal/consumerretry/logs.go#L35 The func `flusher()` from the `LogEmitter` starts a loop and call the `consumerFunc` with `context.Background()`. When the `ConsumeLogs` is blocked by the retry, there is no way to cancel the retry, thus the `LogEmitter` will hang when I try to shut down the collector. In this PR, I created a ctx in the `Start` func, which will be cancelled later in the `Shutdown` func. The ctx is passed to the flusher and used for the flush in every `flushInterval`. However, I have to swap it with another ctx with timeout during shutdown to flush the remaining batch out one last time. That's the best approach I can think of for now, and I'm open to other suggestions. --------- Signed-off-by: Mengnan Gong <[email protected]> Co-authored-by: Daniel Jaglowski <[email protected]>
1 parent d106775 commit f4c41b1

File tree

1 file changed

+17
-8
lines changed

1 file changed

+17
-8
lines changed

pkg/stanza/operator/helper/emitter.go

+17-8
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
// LogEmitter is a stanza operator that emits log entries to the consumer callback function `consumerFunc`
1818
type LogEmitter struct {
1919
OutputOperator
20-
closeChan chan struct{}
20+
cancel context.CancelFunc
2121
stopOnce sync.Once
2222
batchMux sync.Mutex
2323
batch []*entry.Entry
@@ -65,7 +65,6 @@ func NewLogEmitter(set component.TelemetrySettings, consumerFunc func(context.Co
6565
op, _ := NewOutputConfig("log_emitter", "log_emitter").Build(set)
6666
e := &LogEmitter{
6767
OutputOperator: op,
68-
closeChan: make(chan struct{}),
6968
maxBatchSize: defaultMaxBatchSize,
7069
batch: make([]*entry.Entry, 0, defaultMaxBatchSize),
7170
flushInterval: defaultFlushInterval,
@@ -79,15 +78,21 @@ func NewLogEmitter(set component.TelemetrySettings, consumerFunc func(context.Co
7978

8079
// Start starts the goroutine(s) required for this operator
8180
func (e *LogEmitter) Start(_ operator.Persister) error {
81+
ctx, cancel := context.WithCancel(context.Background())
82+
e.cancel = cancel
83+
8284
e.wg.Add(1)
83-
go e.flusher()
85+
go e.flusher(ctx)
8486
return nil
8587
}
8688

8789
// Stop will close the log channel and stop running goroutines
8890
func (e *LogEmitter) Stop() error {
8991
e.stopOnce.Do(func() {
90-
close(e.closeChan)
92+
// the cancel func could be nil if the emitter is never started.
93+
if e.cancel != nil {
94+
e.cancel()
95+
}
9196
e.wg.Wait()
9297
})
9398

@@ -120,7 +125,7 @@ func (e *LogEmitter) appendEntry(ent *entry.Entry) []*entry.Entry {
120125
}
121126

122127
// flusher flushes the current batch every flush interval. Intended to be run as a goroutine
123-
func (e *LogEmitter) flusher() {
128+
func (e *LogEmitter) flusher(ctx context.Context) {
124129
defer e.wg.Done()
125130

126131
ticker := time.NewTicker(e.flushInterval)
@@ -130,12 +135,16 @@ func (e *LogEmitter) flusher() {
130135
select {
131136
case <-ticker.C:
132137
if oldBatch := e.makeNewBatch(); len(oldBatch) > 0 {
133-
e.consumerFunc(context.Background(), oldBatch)
138+
e.consumerFunc(ctx, oldBatch)
134139
}
135-
case <-e.closeChan:
140+
case <-ctx.Done():
141+
// Create a new context with timeout for the final flush
142+
flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
143+
defer cancel()
144+
136145
// flush currently batched entries
137146
if oldBatch := e.makeNewBatch(); len(oldBatch) > 0 {
138-
e.consumerFunc(context.Background(), oldBatch)
147+
e.consumerFunc(flushCtx, oldBatch)
139148
}
140149
return
141150
}

0 commit comments

Comments
 (0)