Skip to content

Commit 13d6054

Browse files
change LogEmitter to process entries in batches
Instead of adding each entry in a batch separately to the LogEmitter's buffer, add the whole batch in one operation.
1 parent 477f67b commit 13d6054

File tree

1 file changed

+18
-2
lines changed

1 file changed

+18
-2
lines changed

pkg/stanza/operator/helper/emitter.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,24 @@ func (e *LogEmitter) Stop() error {
9696

9797
// ProcessBatch emits the entries to the consumerFunc
9898
func (e *LogEmitter) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
99-
for i := range entries {
100-
_ = e.Process(ctx, entries[i])
99+
if oldBatch := e.appendEntries(entries); len(oldBatch) > 0 {
100+
e.consumerFunc(ctx, oldBatch)
101+
}
102+
103+
return nil
104+
}
105+
106+
// appendEntry appends the entry to the current batch. If maxBatchSize is reached, a new batch will be made, and the old batch
107+
// (which should be flushed) will be returned
108+
func (e *LogEmitter) appendEntries(entries []*entry.Entry) []*entry.Entry {
109+
e.batchMux.Lock()
110+
defer e.batchMux.Unlock()
111+
112+
e.batch = append(e.batch, entries...)
113+
if uint(len(e.batch)) >= e.maxBatchSize {
114+
var oldBatch []*entry.Entry
115+
oldBatch, e.batch = e.batch, make([]*entry.Entry, 0, e.maxBatchSize)
116+
return oldBatch
101117
}
102118

103119
return nil

0 commit comments

Comments
 (0)