Skip to content

Commit 8e3197b

Browse files
do not split batches in Log Emitter
This changes the Log Emitter to run the `consumerFunc` on the whole batch, instead of splitting the batch into individual entries and calling `consumerFunc` on each of them. This doesn't change much while the Log Emitter has its own `batch` buffer, but if we remove the `batch` buffer (see open-telemetry#35456), this should prevent the performance drop described in open-telemetry#35454.
1 parent 187b345 commit 8e3197b

File tree

1 file changed

+19
-3
lines changed

1 file changed

+19
-3
lines changed

pkg/stanza/operator/helper/emitter.go

+19-3
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,9 @@ func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error {
104104
}
105105

106106
// ProcessBatch emits the entries to the consumerFunc
107-
func (e *LogEmitter) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
108-
for _, entry := range entries {
109-
e.Process(ctx, &entry)
107+
func (e *LogEmitter) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
108+
if oldBatch := e.appendEntries(entries); len(oldBatch) > 0 {
109+
e.consumerFunc(ctx, oldBatch)
110110
}
111111

112112
return nil
@@ -128,6 +128,22 @@ func (e *LogEmitter) appendEntry(ent *entry.Entry) []*entry.Entry {
128128
return nil
129129
}
130130

131+
// appendEntries appends the entries to the current batch. If maxBatchSize is reached, a new batch will be made, and the old batch
132+
// (which should be flushed) will be returned
133+
func (e *LogEmitter) appendEntries(entries []*entry.Entry) []*entry.Entry {
134+
e.batchMux.Lock()
135+
defer e.batchMux.Unlock()
136+
137+
e.batch = append(e.batch, entries...)
138+
if uint(len(e.batch)) >= e.maxBatchSize {
139+
var oldBatch []*entry.Entry
140+
oldBatch, e.batch = e.batch, make([]*entry.Entry, 0, e.maxBatchSize)
141+
return oldBatch
142+
}
143+
144+
return nil
145+
}
146+
131147
// flusher flushes the current batch every flush interval. Intended to be run as a goroutine
132148
func (e *LogEmitter) flusher() {
133149
defer e.wg.Done()

0 commit comments

Comments
 (0)