Skip to content

Commit 9747d92

Browse files
committed
Flush data on shutdown in stanza adapter
1 parent 07e5786 commit 9747d92

File tree

4 files changed

+85
-97
lines changed

4 files changed

+85
-97
lines changed

pkg/stanza/adapter/converter.go

+26-56
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"context"
88
"encoding/binary"
99
"encoding/json"
10-
"errors"
1110
"fmt"
1211
"math"
1312
"runtime"
@@ -56,8 +55,6 @@ type Converter struct {
5655
pLogsChan chan plog.Logs
5756

5857
stopOnce sync.Once
59-
stopChan chan struct{}
60-
6158
// workerChan is an internal communication channel that gets the log
6259
// entries from Batch() calls and it receives the data in workerLoop().
6360
workerChan chan []*entry.Entry
@@ -95,7 +92,6 @@ func NewConverter(logger *zap.Logger, opts ...converterOption) *Converter {
9592
workerChan: make(chan []*entry.Entry),
9693
workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))),
9794
pLogsChan: make(chan plog.Logs),
98-
stopChan: make(chan struct{}),
9995
flushChan: make(chan plog.Logs),
10096
logger: logger,
10197
}
@@ -113,15 +109,14 @@ func (c *Converter) Start() {
113109
go c.workerLoop()
114110
}
115111

116-
c.wg.Add(1)
117112
go c.flushLoop()
118113
}
119114

120115
func (c *Converter) Stop() {
121116
c.stopOnce.Do(func() {
122-
close(c.stopChan)
117+
close(c.workerChan)
123118
c.wg.Wait()
124-
close(c.pLogsChan)
119+
close(c.flushChan)
125120
})
126121
}
127122

@@ -136,62 +131,43 @@ func (c *Converter) OutChannel() <-chan plog.Logs {
136131
func (c *Converter) workerLoop() {
137132
defer c.wg.Done()
138133

139-
for {
140-
141-
select {
142-
case <-c.stopChan:
143-
return
134+
for entries := range c.workerChan {
135+
resourceHashToIdx := make(map[uint64]int)
144136

145-
case entries, ok := <-c.workerChan:
137+
pLogs := plog.NewLogs()
138+
var sl plog.ScopeLogs
139+
for _, e := range entries {
140+
resourceID := HashResource(e.Resource)
141+
resourceIdx, ok := resourceHashToIdx[resourceID]
146142
if !ok {
147-
return
148-
}
149-
150-
resourceHashToIdx := make(map[uint64]int)
151-
152-
pLogs := plog.NewLogs()
153-
var sl plog.ScopeLogs
154-
for _, e := range entries {
155-
resourceID := HashResource(e.Resource)
156-
resourceIdx, ok := resourceHashToIdx[resourceID]
157-
if !ok {
158-
resourceHashToIdx[resourceID] = pLogs.ResourceLogs().Len()
159-
rl := pLogs.ResourceLogs().AppendEmpty()
160-
upsertToMap(e.Resource, rl.Resource().Attributes())
161-
sl = rl.ScopeLogs().AppendEmpty()
162-
} else {
163-
sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(0)
164-
}
165-
convertInto(e, sl.LogRecords().AppendEmpty())
166-
}
167-
168-
// Send plogs directly to flushChan
169-
select {
170-
case c.flushChan <- pLogs:
171-
case <-c.stopChan:
143+
resourceHashToIdx[resourceID] = pLogs.ResourceLogs().Len()
144+
rl := pLogs.ResourceLogs().AppendEmpty()
145+
upsertToMap(e.Resource, rl.Resource().Attributes())
146+
sl = rl.ScopeLogs().AppendEmpty()
147+
} else {
148+
sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(0)
172149
}
150+
convertInto(e, sl.LogRecords().AppendEmpty())
173151
}
152+
153+
// Send plogs directly to flushChan
154+
c.flushChan <- pLogs
174155
}
175156
}
176157

177158
func (c *Converter) flushLoop() {
178-
defer c.wg.Done()
179159
ctx, cancel := context.WithCancel(context.Background())
180160
defer cancel()
181161

182-
for {
183-
select {
184-
case <-c.stopChan:
185-
return
186-
187-
case pLogs := <-c.flushChan:
188-
if err := c.flush(ctx, pLogs); err != nil {
189-
c.logger.Debug("Problem sending log entries",
190-
zap.Error(err),
191-
)
192-
}
162+
for pLogs := range c.flushChan {
163+
if err := c.flush(ctx, pLogs); err != nil {
164+
c.logger.Debug("Problem sending log entries",
165+
zap.Error(err),
166+
)
193167
}
194168
}
169+
170+
close(c.pLogsChan)
195171
}
196172

197173
// flush flushes provided plog.Logs entries onto a channel.
@@ -203,10 +179,6 @@ func (c *Converter) flush(ctx context.Context, pLogs plog.Logs) error {
203179
return fmt.Errorf("flushing log entries interrupted, err: %w", ctx.Err())
204180

205181
case c.pLogsChan <- pLogs:
206-
207-
// The converter has been stopped so bail the flush.
208-
case <-c.stopChan:
209-
return errors.New("logs converter has been stopped")
210182
}
211183

212184
return nil
@@ -217,8 +189,6 @@ func (c *Converter) Batch(e []*entry.Entry) error {
217189
select {
218190
case c.workerChan <- e:
219191
return nil
220-
case <-c.stopChan:
221-
return errors.New("logs converter has been stopped")
222192
}
223193
}
224194

pkg/stanza/adapter/emitter.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -149,17 +149,17 @@ func (e *LogEmitter) flusher(ctx context.Context) {
149149
e.flush(ctx, oldBatch)
150150
}
151151
case <-ctx.Done():
152+
if oldBatch := e.makeNewBatch(); len(oldBatch) > 0 {
153+
e.flush(ctx, oldBatch)
154+
}
152155
return
153156
}
154157
}
155158
}
156159

157160
// flush flushes the provided batch to the log channel.
158161
func (e *LogEmitter) flush(ctx context.Context, batch []*entry.Entry) {
159-
select {
160-
case e.logChan <- batch:
161-
case <-ctx.Done():
162-
}
162+
e.logChan <- batch
163163
}
164164

165165
// makeNewBatch replaces the current batch on the log emitter with a new batch, returning the old one

pkg/stanza/adapter/receiver.go

+12-37
Original file line numberDiff line numberDiff line change
@@ -83,51 +83,27 @@ func (r *receiver) emitterLoop(ctx context.Context) {
8383
defer r.wg.Done()
8484

8585
// Don't create done channel on every iteration.
86-
doneChan := ctx.Done()
87-
for {
88-
select {
89-
case <-doneChan:
90-
r.logger.Debug("Receive loop stopped")
91-
return
92-
93-
case e, ok := <-r.emitter.logChan:
94-
if !ok {
95-
continue
96-
}
97-
98-
if err := r.converter.Batch(e); err != nil {
99-
r.logger.Error("Could not add entry to batch", zap.Error(err))
100-
}
86+
for e := range r.emitter.logChan {
87+
if err := r.converter.Batch(e); err != nil {
88+
r.logger.Error("Could not add entry to batch", zap.Error(err))
10189
}
10290
}
91+
r.converter.Stop()
10392
}
10493

10594
// consumerLoop reads converter log entries and calls the consumer to consumer them.
10695
func (r *receiver) consumerLoop(ctx context.Context) {
10796
defer r.wg.Done()
10897

109-
// Don't create done channel on every iteration.
110-
doneChan := ctx.Done()
11198
pLogsChan := r.converter.OutChannel()
112-
for {
113-
select {
114-
case <-doneChan:
115-
r.logger.Debug("Consumer loop stopped")
116-
return
117-
118-
case pLogs, ok := <-pLogsChan:
119-
if !ok {
120-
r.logger.Debug("Converter channel got closed")
121-
continue
122-
}
123-
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
124-
logRecordCount := pLogs.LogRecordCount()
125-
cErr := r.consumer.ConsumeLogs(ctx, pLogs)
126-
if cErr != nil {
127-
r.logger.Error("ConsumeLogs() failed", zap.Error(cErr))
128-
}
129-
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
99+
for pLogs := range pLogsChan {
100+
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
101+
logRecordCount := pLogs.LogRecordCount()
102+
cErr := r.consumer.ConsumeLogs(ctx, pLogs)
103+
if cErr != nil {
104+
r.logger.Error("ConsumeLogs() failed", zap.Error(cErr))
130105
}
106+
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
131107
}
132108
}
133109

@@ -139,8 +115,7 @@ func (r *receiver) Shutdown(ctx context.Context) error {
139115

140116
r.logger.Info("Stopping stanza receiver")
141117
pipelineErr := r.pipe.Stop()
142-
r.converter.Stop()
143-
r.cancel()
118+
// r.cancel()
144119
r.wg.Wait()
145120

146121
if r.storageClient != nil {

pkg/stanza/adapter/receiver_test.go

+43
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ import (
88
"fmt"
99
"os"
1010
"path/filepath"
11+
"sync/atomic"
1112
"testing"
1213
"time"
1314

15+
"github.com/stretchr/testify/assert"
1416
"github.com/stretchr/testify/require"
1517
"go.opentelemetry.io/collector/component"
1618
"go.opentelemetry.io/collector/component/componenttest"
@@ -93,6 +95,47 @@ func TestHandleConsume(t *testing.T) {
9395
require.NoError(t, logsReceiver.Shutdown(context.Background()))
9496
}
9597

98+
func TestShutdownFlush(t *testing.T) {
99+
mockConsumer := &consumertest.LogsSink{}
100+
factory := NewFactory(TestReceiverType{}, component.StabilityLevelDevelopment)
101+
102+
logsReceiver, err := factory.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), factory.CreateDefaultConfig(), mockConsumer)
103+
require.NoError(t, err, "receiver should successfully build")
104+
105+
err = logsReceiver.Start(context.Background(), componenttest.NewNopHost())
106+
require.NoError(t, err, "receiver start failed")
107+
108+
var consumedLogCount atomic.Int32
109+
closeCh := make(chan struct{})
110+
stanzaReceiver := logsReceiver.(*receiver)
111+
go func() {
112+
for {
113+
select {
114+
case <-closeCh:
115+
require.NoError(t, logsReceiver.Shutdown(context.Background()))
116+
return
117+
default:
118+
err := stanzaReceiver.emitter.Process(context.Background(), entry.New())
119+
require.NoError(t, err)
120+
}
121+
consumedLogCount.Add(1)
122+
}
123+
}()
124+
require.Eventually(t, func() bool {
125+
return consumedLogCount.Load() > 100
126+
}, 5*time.Second, 5*time.Millisecond)
127+
128+
close(closeCh)
129+
130+
// Eventually because of asynchronuous nature of the receiver.
131+
require.EventuallyWithT(t,
132+
func(t *assert.CollectT) {
133+
assert.Equal(t, consumedLogCount.Load(), int32(mockConsumer.LogRecordCount()))
134+
},
135+
5*time.Second, 5*time.Millisecond,
136+
)
137+
}
138+
96139
func TestHandleConsumeRetry(t *testing.T) {
97140
mockConsumer := consumerretry.NewMockLogsRejecter(2)
98141
factory := NewFactory(TestReceiverType{}, component.StabilityLevelDevelopment)

0 commit comments

Comments
 (0)