Skip to content

Commit 2760116

Browse files
[pkg/stanza] Introduce batching logs in File consumer (#36663)
#### Description Modifies the File consumer to emit logs in batches as opposed to sending each log individually through the Stanza pipeline and on to the Log Emitter. Here are the changes introduced: - 6b4c9fe Changed the `Reader::ReadToEnd` method in File consumer to collect the tokens scanned from the file into batches. At this point, the Reader still emits each token individually, as the `emit.Callback` function only accepts a single token. - c206995 Changed `emit.Callback` function signature to accept a slice of tokens as opposed to a single token, and changed the Reader to emit a batch of tokens in one request. At this point, the batches are still split into individual tokens inside the `emit` function, because the Stanza operators can only process one entry at a time. - aedda3a Added `ProcessBatch` method to Stanza operators and used it in the `emit` function. At this point, the batch of tokens is translated to a batch of entries and passed to Log Emitter as a whole batch. The batch is still split in the Log Emitter, which calls `consumeFunc` for each entry in a loop. - 13d6054 Changed the LogEmitter to add the whole batch to its buffer, as opposed to adding entries one by one. **Slice of entries `[]entry.Entry` vs. slice of pointers `[]*entry.Entry`** I considered whether the `ProcessBatch` method in the `Operator` interface should accept a slice of structs `[]entry.Entry` or a slice of pointers `[]*entry.Entry`. I ran some tests (similar to #35454) and they showed a 7-10% performance loss when using a slice of structs vs. a slice of pointers. That's why I decided to use the slice of pointers `[]*entry.Entry`. #### Link to tracking issue - Fixes #35455 #### Testing No changes in tests. The goal is for the functionality to not change and for performance to not decrease. I have added a new benchmark in a separate PR #38054 that should be helpful in assessing the performance impact of this change. #### Documentation These are internal changes, no user documentation needs changing.
1 parent 7a7d61b commit 2760116

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+454
-186
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: pkg/stanza
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add method `ProcessBatch` to `Operator` interface in `pkg/stanza/operator` package to support batch processing.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35455]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [api]
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: pkg/stanza
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Change signature of `emit.Callback` function in `pkg/stanza/fileconsumer/emit` package to emit multiple tokens.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35455]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [api]

pkg/stanza/adapter/mocks_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ func (o *UnstartableOperator) Start(_ operator.Persister) error {
5050
return errors.New("something very unusual happened")
5151
}
5252

53+
func (o *UnstartableOperator) ProcessBatch(_ context.Context, _ []*entry.Entry) error {
54+
return nil
55+
}
56+
5357
// Process will return nil
5458
func (o *UnstartableOperator) Process(_ context.Context, _ *entry.Entry) error {
5559
return nil

pkg/stanza/fileconsumer/benchmark_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/stretchr/testify/require"
1717
"go.opentelemetry.io/collector/component/componenttest"
1818

19-
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
2019
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
2120
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/internal/filetest"
2221
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
@@ -188,8 +187,8 @@ func BenchmarkFileInput(b *testing.B) {
188187
cfg.PollInterval = time.Microsecond
189188

190189
doneChan := make(chan bool, len(files))
191-
callback := func(_ context.Context, token emit.Token) error {
192-
if len(token.Body) == 0 {
190+
callback := func(_ context.Context, tokens [][]byte, _ map[string]any, _ int64) error {
191+
if len(tokens) > 0 && len(tokens[len(tokens)-1]) == 0 {
193192
doneChan <- true
194193
}
195194
return nil

pkg/stanza/fileconsumer/emit/emit.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"context"
88
)
99

10-
type Callback func(ctx context.Context, token Token) error
10+
type Callback func(ctx context.Context, tokens [][]byte, attributes map[string]any, lastRecordNumber int64) error
1111

1212
type Token struct {
1313
Body []byte

pkg/stanza/fileconsumer/file_test.go

-75
Original file line numberDiff line numberDiff line change
@@ -1580,78 +1580,3 @@ func TestReadGzipCompressedLogsFromEnd(t *testing.T) {
15801580
operator.poll(context.TODO())
15811581
sink.ExpectToken(t, []byte("testlog4"))
15821582
}
1583-
1584-
func TestIncludeFileRecordNumber(t *testing.T) {
1585-
t.Parallel()
1586-
1587-
tempDir := t.TempDir()
1588-
cfg := NewConfig().includeDir(tempDir)
1589-
cfg.StartAt = "beginning"
1590-
cfg.IncludeFileRecordNumber = true
1591-
operator, sink := testManager(t, cfg)
1592-
1593-
// Create a file, then start
1594-
temp := filetest.OpenTemp(t, tempDir)
1595-
filetest.WriteString(t, temp, "testlog1\n")
1596-
1597-
require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
1598-
defer func() {
1599-
require.NoError(t, operator.Stop())
1600-
}()
1601-
1602-
sink.ExpectCall(t, []byte("testlog1"), map[string]any{
1603-
attrs.LogFileName: filepath.Base(temp.Name()),
1604-
attrs.LogFileRecordNumber: int64(1),
1605-
})
1606-
}
1607-
1608-
func TestIncludeFileRecordNumberWithHeaderConfigured(t *testing.T) {
1609-
t.Parallel()
1610-
1611-
tempDir := t.TempDir()
1612-
cfg := NewConfig().includeDir(tempDir)
1613-
cfg.StartAt = "beginning"
1614-
cfg.IncludeFileRecordNumber = true
1615-
cfg = cfg.withHeader("^#", "(?P<header_attr>[A-z]+)")
1616-
operator, sink := testManager(t, cfg)
1617-
1618-
// Create a file, then start
1619-
temp := filetest.OpenTemp(t, tempDir)
1620-
filetest.WriteString(t, temp, "#abc\n#xyz: headerValue2\ntestlog1\n")
1621-
1622-
require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
1623-
defer func() {
1624-
require.NoError(t, operator.Stop())
1625-
}()
1626-
1627-
sink.ExpectCall(t, []byte("testlog1"), map[string]any{
1628-
attrs.LogFileName: filepath.Base(temp.Name()),
1629-
attrs.LogFileRecordNumber: int64(1),
1630-
"header_attr": "xyz",
1631-
})
1632-
}
1633-
1634-
func TestIncludeFileRecordNumberWithHeaderConfiguredButMissing(t *testing.T) {
1635-
t.Parallel()
1636-
1637-
tempDir := t.TempDir()
1638-
cfg := NewConfig().includeDir(tempDir)
1639-
cfg.StartAt = "beginning"
1640-
cfg.IncludeFileRecordNumber = true
1641-
cfg = cfg.withHeader("^#", "(?P<header_key>[A-z]+): (?P<header_value>[A-z]+)")
1642-
operator, sink := testManager(t, cfg)
1643-
1644-
// Create a file, then start
1645-
temp := filetest.OpenTemp(t, tempDir)
1646-
filetest.WriteString(t, temp, "testlog1\n")
1647-
1648-
require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
1649-
defer func() {
1650-
require.NoError(t, operator.Stop())
1651-
}()
1652-
1653-
sink.ExpectCall(t, []byte("testlog1"), map[string]any{
1654-
attrs.LogFileName: filepath.Base(temp.Name()),
1655-
attrs.LogFileRecordNumber: int64(1),
1656-
})
1657-
}

pkg/stanza/fileconsumer/internal/emittest/nop.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@ package emittest // import "github.com/open-telemetry/opentelemetry-collector-co
55

66
import (
77
"context"
8-
9-
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
108
)
119

12-
func Nop(_ context.Context, _ emit.Token) error {
10+
func Nop(_ context.Context, _ [][]byte, _ map[string]any, _ int64) error {
1311
return nil
1412
}

pkg/stanza/fileconsumer/internal/emittest/nop_test.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,8 @@ import (
88
"testing"
99

1010
"github.com/stretchr/testify/require"
11-
12-
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
1311
)
1412

1513
func TestNop(t *testing.T) {
16-
require.NoError(t, Nop(context.Background(), emit.Token{}))
14+
require.NoError(t, Nop(context.Background(), [][]byte{}, map[string]any{}, int64(0)))
1715
}

pkg/stanza/fileconsumer/internal/emittest/sink.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,13 @@ func NewSink(opts ...SinkOpt) *Sink {
5252
return &Sink{
5353
emitChan: emitChan,
5454
timeout: cfg.timeout,
55-
Callback: func(ctx context.Context, token emit.Token) error {
56-
select {
57-
case <-ctx.Done():
58-
return ctx.Err()
59-
case emitChan <- token:
55+
Callback: func(ctx context.Context, tokens [][]byte, attributes map[string]any, _ int64) error {
56+
for _, token := range tokens {
57+
select {
58+
case <-ctx.Done():
59+
return ctx.Err()
60+
case emitChan <- emit.NewToken(token, attributes):
61+
}
6062
}
6163
return nil
6264
},

pkg/stanza/fileconsumer/internal/emittest/sink_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []emit.Token) {
204204
}
205205
go func() {
206206
for _, c := range testCalls {
207-
assert.NoError(t, s.Callback(context.Background(), emit.NewToken(c.Body, c.Attributes)))
207+
assert.NoError(t, s.Callback(context.Background(), [][]byte{c.Body}, c.Attributes, 0))
208208
}
209209
}()
210210
return s, testCalls

pkg/stanza/fileconsumer/internal/header/output.go

+7
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ func newPipelineOutput(set component.TelemetrySettings) *pipelineOutput {
3030
}
3131
}
3232

33+
func (e *pipelineOutput) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
34+
for i := range entries {
35+
_ = e.Process(ctx, entries[i])
36+
}
37+
return nil
38+
}
39+
3340
// Drop the entry if logChan is full, in order to avoid this operator blocking.
3441
// This protects against a case where an operator could return an error, but continue propagating a log entry,
3542
// leaving an unexpected entry in the output channel.

pkg/stanza/fileconsumer/internal/reader/factory.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ import (
2424
)
2525

2626
const (
27-
DefaultMaxLogSize = 1024 * 1024
28-
DefaultFlushPeriod = 500 * time.Millisecond
27+
DefaultMaxLogSize = 1024 * 1024
28+
DefaultFlushPeriod = 500 * time.Millisecond
29+
DefaultMaxBatchSize = 100
2930
)
3031

3132
type Factory struct {
@@ -81,6 +82,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
8182
includeFileRecordNum: f.IncludeFileRecordNumber,
8283
compression: f.Compression,
8384
acquireFSLock: f.AcquireFSLock,
85+
maxBatchSize: DefaultMaxBatchSize,
8486
emitFunc: f.EmitFunc,
8587
}
8688
r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName))

pkg/stanza/fileconsumer/internal/reader/reader.go

+21-8
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"golang.org/x/text/encoding"
1717

1818
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/textutils"
19-
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs"
2019
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
2120
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
2221
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
@@ -55,6 +54,7 @@ type Reader struct {
5554
includeFileRecordNum bool
5655
compression string
5756
acquireFSLock bool
57+
maxBatchSize int
5858
}
5959

6060
// ReadToEnd will read until the end of the file
@@ -188,6 +188,8 @@ func (r *Reader) readContents(ctx context.Context) {
188188

189189
s := scanner.New(r, r.maxLogSize, bufferSize, r.Offset, r.contentSplitFunc)
190190

191+
tokenBodies := make([][]byte, r.maxBatchSize)
192+
numTokensBatched := 0
191193
// Iterate over the contents of the file.
192194
for {
193195
select {
@@ -203,27 +205,38 @@ func (r *Reader) readContents(ctx context.Context) {
203205
} else if r.deleteAtEOF {
204206
r.delete()
205207
}
208+
209+
if numTokensBatched > 0 {
210+
err := r.emitFunc(ctx, tokenBodies[:numTokensBatched], r.FileAttributes, r.RecordNum)
211+
if err != nil {
212+
r.set.Logger.Error("failed to emit token", zap.Error(err))
213+
}
214+
r.Offset = s.Pos()
215+
}
206216
return
207217
}
208218

209-
token, err := r.decoder.Bytes(s.Bytes())
219+
var err error
220+
tokenBodies[numTokensBatched], err = r.decoder.Bytes(s.Bytes())
210221
if err != nil {
211222
r.set.Logger.Error("failed to decode token", zap.Error(err))
212223
r.Offset = s.Pos() // move past the bad token or we may be stuck
213224
continue
214225
}
226+
numTokensBatched++
215227

216228
if r.includeFileRecordNum {
217229
r.RecordNum++
218-
r.FileAttributes[attrs.LogFileRecordNumber] = r.RecordNum
219230
}
220231

221-
err = r.emitFunc(ctx, emit.NewToken(token, r.FileAttributes))
222-
if err != nil {
223-
r.set.Logger.Error("failed to process token", zap.Error(err))
232+
if r.maxBatchSize > 0 && numTokensBatched >= r.maxBatchSize {
233+
err := r.emitFunc(ctx, tokenBodies[:numTokensBatched], r.FileAttributes, r.RecordNum)
234+
if err != nil {
235+
r.set.Logger.Error("failed to emit token", zap.Error(err))
236+
}
237+
numTokensBatched = 0
238+
r.Offset = s.Pos()
224239
}
225-
226-
r.Offset = s.Pos()
227240
}
228241
}
229242

pkg/stanza/fileconsumer/internal/reader/reader_test.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -347,10 +347,8 @@ func BenchmarkFileRead(b *testing.B) {
347347

348348
// Use a long flush period to ensure it does not expire DURING a ReadToEnd
349349
counter := atomic.Int64{}
350-
f := newTestFactory(b, func(_ context.Context, token emit.Token) error {
351-
if len(token.Body) != 0 {
352-
counter.Add(1)
353-
}
350+
f := newTestFactory(b, func(_ context.Context, tokens [][]byte, _ map[string]any, _ int64) error {
351+
counter.Add(int64(len(tokens)))
354352
return nil
355353
})
356354
b.ReportAllocs()

pkg/stanza/operator/helper/emitter.go

+25
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,31 @@ func (e *LogEmitter) Stop() error {
9999
return nil
100100
}
101101

102+
// ProcessBatch emits the entries to the consumerFunc
103+
func (e *LogEmitter) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
104+
if oldBatch := e.appendEntries(entries); len(oldBatch) > 0 {
105+
e.consumerFunc(ctx, oldBatch)
106+
}
107+
108+
return nil
109+
}
110+
111+
// appendEntries appends the entry to the current batch. If maxBatchSize is reached, a new batch will be made, and the old batch
112+
// (which should be flushed) will be returned
113+
func (e *LogEmitter) appendEntries(entries []*entry.Entry) []*entry.Entry {
114+
e.batchMux.Lock()
115+
defer e.batchMux.Unlock()
116+
117+
e.batch = append(e.batch, entries...)
118+
if uint(len(e.batch)) >= e.maxBatchSize {
119+
var oldBatch []*entry.Entry
120+
oldBatch, e.batch = e.batch, make([]*entry.Entry, 0, e.maxBatchSize)
121+
return oldBatch
122+
}
123+
124+
return nil
125+
}
126+
102127
// Process will emit an entry to the output channel
103128
func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error {
104129
if oldBatch := e.appendEntry(ent); len(oldBatch) > 0 {

pkg/stanza/operator/helper/input.go

+9
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,15 @@ func (i *InputOperator) CanProcess() bool {
8282
return false
8383
}
8484

85+
// ProcessBatch will always return an error if called.
86+
func (i *InputOperator) ProcessBatch(_ context.Context, _ []*entry.Entry) error {
87+
i.Logger().Error("Operator received a batch of entries, but can not process")
88+
return errors.NewError(
89+
"Operator can not process logs.",
90+
"Ensure that operator is not configured to receive logs from other operators",
91+
)
92+
}
93+
8594
// Process will always return an error if called.
8695
func (i *InputOperator) Process(_ context.Context, _ *entry.Entry) error {
8796
i.Logger().Error("Operator received an entry, but can not process")

0 commit comments

Comments
 (0)