Skip to content

Commit aedda3a

Browse files
add WriterOperator::WriteBatch and Operator::ProcessBatch methods
The File input's `emitBatch` function now calls `ProcessBatch` instead of `Process`. The added `ProcessBatch` method will make each Stanza operator capable of accepting a batch of entries. At this stage, all the implementations of `ProcessBatch` just call `Process` in a loop.
1 parent c206995 commit aedda3a

File tree

37 files changed

+341
-12
lines changed

37 files changed

+341
-12
lines changed

pkg/stanza/adapter/mocks_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ func (o *UnstartableOperator) Start(_ operator.Persister) error {
5050
return errors.New("something very unusual happened")
5151
}
5252

53+
func (o *UnstartableOperator) ProcessBatch(_ context.Context, _ []entry.Entry) error {
54+
return nil
55+
}
56+
5357
// Process will return nil
5458
func (o *UnstartableOperator) Process(_ context.Context, _ *entry.Entry) error {
5559
return nil

pkg/stanza/fileconsumer/internal/header/output.go

+7
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ func newPipelineOutput(set component.TelemetrySettings) *pipelineOutput {
3030
}
3131
}
3232

33+
func (e *pipelineOutput) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
34+
for i := range entries {
35+
_ = e.Process(ctx, &entries[i])
36+
}
37+
return nil
38+
}
39+
3340
// Drop the entry if logChan is full, in order to avoid this operator blocking.
3441
// This protects against a case where an operator could return an error, but continue propagating a log entry,
3542
// leaving an unexpected entry in the output channel.

pkg/stanza/operator/helper/emitter.go

+9
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,15 @@ func (e *LogEmitter) Stop() error {
9494
return nil
9595
}
9696

97+
// ProcessBatch emits the entries to the consumerFunc
98+
func (e *LogEmitter) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
99+
for i := range entries {
100+
_ = e.Process(ctx, &entries[i])
101+
}
102+
103+
return nil
104+
}
105+
97106
// Process will emit an entry to the output channel
98107
func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error {
99108
if oldBatch := e.appendEntry(ent); len(oldBatch) > 0 {

pkg/stanza/operator/helper/input.go

+9
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,15 @@ func (i *InputOperator) CanProcess() bool {
8282
return false
8383
}
8484

85+
// ProcessBatch will always return an error if called.
86+
func (i *InputOperator) ProcessBatch(_ context.Context, _ []entry.Entry) error {
87+
i.Logger().Error("Operator received a batch of entries, but can not process")
88+
return errors.NewError(
89+
"Operator can not process logs.",
90+
"Ensure that operator is not configured to receive logs from other operators",
91+
)
92+
}
93+
8594
// Process will always return an error if called.
8695
func (i *InputOperator) Process(_ context.Context, _ *entry.Entry) error {
8796
i.Logger().Error("Operator received an entry, but can not process")

pkg/stanza/operator/helper/writer.go

+19
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,25 @@ type WriterOperator struct {
4747
OutputOperators []operator.Operator
4848
}
4949

50+
// Write writes a batch of entries to the outputs of the operator.
51+
// A batch is a collection of entries that are sent in one go.
52+
func (w *WriterOperator) WriteBatch(ctx context.Context, entries []entry.Entry) error {
53+
for i, op := range w.OutputOperators {
54+
if i == len(w.OutputOperators)-1 {
55+
return op.ProcessBatch(ctx, entries)
56+
}
57+
copyOfEntries := make([]entry.Entry, 0, len(entries))
58+
for i := range entries {
59+
copyOfEntries = append(copyOfEntries, *entries[i].Copy())
60+
}
61+
err := op.ProcessBatch(ctx, copyOfEntries)
62+
if err != nil {
63+
w.Logger().Error("Failed to process entries", zap.Error(err))
64+
}
65+
}
66+
return nil
67+
}
68+
5069
// Write will write an entry to the outputs of the operator.
5170
func (w *WriterOperator) Write(ctx context.Context, e *entry.Entry) error {
5271
for i, op := range w.OutputOperators {

pkg/stanza/operator/input/file/input.go

+25-12
Original file line numberDiff line numberDiff line change
@@ -39,33 +39,46 @@ func (i *Input) Stop() error {
3939
}
4040

4141
func (i *Input) emitBatch(ctx context.Context, tokens []emit.Token) error {
42+
entries, conversionError := i.convertTokens(tokens)
43+
if conversionError != nil {
44+
conversionError = fmt.Errorf("convert tokens: %w", conversionError)
45+
}
46+
47+
consumeError := i.WriteBatch(ctx, entries)
48+
if consumeError != nil {
49+
consumeError = fmt.Errorf("consume entries: %w", consumeError)
50+
}
51+
52+
return errors.Join(conversionError, consumeError)
53+
}
54+
55+
func (i *Input) convertTokens(tokens []emit.Token) ([]entry.Entry, error) {
56+
entries := make([]entry.Entry, 0, len(tokens))
4257
var errs []error
4358
for _, token := range tokens {
44-
err := i.emit(ctx, token)
59+
if len(token.Body) == 0 {
60+
continue
61+
}
62+
entry, err := i.convertToken(token)
4563
if err != nil {
4664
errs = append(errs, err)
65+
continue
4766
}
67+
entries = append(entries, *entry)
4868
}
49-
if len(errs) > 0 {
50-
return errors.Join(errs...)
51-
}
52-
return nil
69+
return entries, errors.Join(errs...)
5370
}
5471

55-
func (i *Input) emit(ctx context.Context, token emit.Token) error {
56-
if len(token.Body) == 0 {
57-
return nil
58-
}
59-
72+
func (i *Input) convertToken(token emit.Token) (*entry.Entry, error) {
6073
ent, err := i.NewEntry(i.toBody(token.Body))
6174
if err != nil {
62-
return fmt.Errorf("create entry: %w", err)
75+
return nil, fmt.Errorf("create entry: %w", err)
6376
}
6477

6578
for k, v := range token.Attributes {
6679
if err := ent.Set(entry.NewAttributeField(k), v); err != nil {
6780
i.Logger().Error("set attribute", zap.Error(err))
6881
}
6982
}
70-
return i.Write(ctx, ent)
83+
return ent, nil
7184
}

pkg/stanza/operator/operator.go

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type Operator interface {
3636

3737
// CanProcess indicates if the operator will process entries from other operators.
3838
CanProcess() bool
39+
// Process processes a batch of entries from an operator.
40+
ProcessBatch(context.Context, []entry.Entry) error
3941
// Process will process an entry from an operator.
4042
Process(context.Context, *entry.Entry) error
4143
// Logger returns the operator's logger

pkg/stanza/operator/output/drop/output.go

+4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ type Output struct {
1515
helper.OutputOperator
1616
}
1717

18+
func (o *Output) ProcessBatch(_ context.Context, _ []entry.Entry) error {
19+
return nil
20+
}
21+
1822
// Process will drop the incoming entry.
1923
func (o *Output) Process(_ context.Context, _ *entry.Entry) error {
2024
return nil

pkg/stanza/operator/output/file/output.go

+9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package file // import "github.com/open-telemetry/opentelemetry-collector-contri
66
import (
77
"context"
88
"encoding/json"
9+
"errors"
910
"os"
1011
"sync"
1112
"text/template"
@@ -52,6 +53,14 @@ func (o *Output) Stop() error {
5253
return nil
5354
}
5455

56+
func (o *Output) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
57+
var errs []error
58+
for i := range entries {
59+
errs = append(errs, o.Process(ctx, &entries[i]))
60+
}
61+
return errors.Join(errs...)
62+
}
63+
5564
// Process will write an entry to the output file.
5665
func (o *Output) Process(_ context.Context, entry *entry.Entry) error {
5766
o.mux.Lock()

pkg/stanza/operator/output/stdout/output.go

+9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package stdout // import "github.com/open-telemetry/opentelemetry-collector-cont
66
import (
77
"context"
88
"encoding/json"
9+
"errors"
910
"sync"
1011

1112
"go.uber.org/zap"
@@ -21,6 +22,14 @@ type Output struct {
2122
mux sync.Mutex
2223
}
2324

25+
func (o *Output) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
26+
var errs []error
27+
for i := range entries {
28+
errs = append(errs, o.Process(ctx, &entries[i]))
29+
}
30+
return errors.Join(errs...)
31+
}
32+
2433
// Process will log entries received.
2534
func (o *Output) Process(_ context.Context, entry *entry.Entry) error {
2635
o.mux.Lock()

pkg/stanza/operator/parser/container/parser.go

+8
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,14 @@ type Parser struct {
6868
timeLayout string
6969
}
7070

71+
func (p *Parser) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
72+
var errs []error
73+
for i := range entries {
74+
errs = append(errs, p.Process(ctx, &entries[i]))
75+
}
76+
return errors.Join(errs...)
77+
}
78+
7179
// Process will parse an entry of Container logs
7280
func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) {
7381
format := p.format

pkg/stanza/operator/parser/csv/parser.go

+9
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package csv // import "github.com/open-telemetry/opentelemetry-collector-contrib
44

55
import (
66
"context"
7+
"errors"
78
"fmt"
89
"strings"
910

@@ -27,6 +28,14 @@ type Parser struct {
2728

2829
type parseFunc func(any) (any, error)
2930

31+
func (p *Parser) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
32+
var errs []error
33+
for i := range entries {
34+
errs = append(errs, p.Process(ctx, &entries[i]))
35+
}
36+
return errors.Join(errs...)
37+
}
38+
3039
// Process will parse an entry for csv.
3140
func (p *Parser) Process(ctx context.Context, e *entry.Entry) error {
3241
// Static parse function

pkg/stanza/operator/parser/json/parser.go

+9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package json // import "github.com/open-telemetry/opentelemetry-collector-contri
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
910
"strings"
1011

@@ -21,6 +22,14 @@ type Parser struct {
2122
parseInts bool
2223
}
2324

25+
func (p *Parser) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
26+
var errs []error
27+
for i := range entries {
28+
errs = append(errs, p.Process(ctx, &entries[i]))
29+
}
30+
return errors.Join(errs...)
31+
}
32+
2433
// Process will parse an entry for JSON.
2534
func (p *Parser) Process(ctx context.Context, entry *entry.Entry) error {
2635
return p.ParserOperator.ProcessWith(ctx, entry, p.parse)

pkg/stanza/operator/parser/jsonarray/parser.go

+8
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ type Parser struct {
2121

2222
type parseFunc func(any) (any, error)
2323

24+
func (p *Parser) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
25+
var errs []error
26+
for i := range entries {
27+
errs = append(errs, p.Process(ctx, &entries[i]))
28+
}
29+
return errors.Join(errs...)
30+
}
31+
2432
// Process will parse an entry for json array.
2533
func (p *Parser) Process(ctx context.Context, e *entry.Entry) error {
2634
return p.ParserOperator.ProcessWith(ctx, e, p.parse)

pkg/stanza/operator/parser/keyvalue/parser.go

+9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package keyvalue // import "github.com/open-telemetry/opentelemetry-collector-co
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
910

1011
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/parseutils"
@@ -19,6 +20,14 @@ type Parser struct {
1920
pairDelimiter string
2021
}
2122

23+
func (p *Parser) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
24+
var errs []error
25+
for i := range entries {
26+
errs = append(errs, p.Process(ctx, &entries[i]))
27+
}
28+
return errors.Join(errs...)
29+
}
30+
2231
// Process will parse an entry for key value pairs.
2332
func (p *Parser) Process(ctx context.Context, entry *entry.Entry) error {
2433
return p.ParserOperator.ProcessWith(ctx, entry, p.parse)

pkg/stanza/operator/parser/regex/parser.go

+9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package regex // import "github.com/open-telemetry/opentelemetry-collector-contr
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
910
"regexp"
1011

@@ -26,6 +27,14 @@ func (p *Parser) Stop() error {
2627
return nil
2728
}
2829

30+
func (p *Parser) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
31+
var errs []error
32+
for i := range entries {
33+
errs = append(errs, p.Process(ctx, &entries[i]))
34+
}
35+
return errors.Join(errs...)
36+
}
37+
2938
// Process will parse an entry for regex.
3039
func (p *Parser) Process(ctx context.Context, entry *entry.Entry) error {
3140
return p.ParserOperator.ProcessWith(ctx, entry, p.parse)

pkg/stanza/operator/parser/scope/parser.go

+9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package scope // import "github.com/open-telemetry/opentelemetry-collector-contr
55

66
import (
77
"context"
8+
"errors"
89

910
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
1011
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
@@ -16,6 +17,14 @@ type Parser struct {
1617
helper.ScopeNameParser
1718
}
1819

20+
func (p *Parser) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
21+
var errs []error
22+
for i := range entries {
23+
errs = append(errs, p.Process(ctx, &entries[i]))
24+
}
25+
return errors.Join(errs...)
26+
}
27+
1928
// Process will parse logger name from an entry.
2029
func (p *Parser) Process(ctx context.Context, entry *entry.Entry) error {
2130
return p.ProcessWith(ctx, entry, p.Parse)

pkg/stanza/operator/parser/severity/parser.go

+9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package severity // import "github.com/open-telemetry/opentelemetry-collector-co
55

66
import (
77
"context"
8+
"errors"
89

910
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
1011
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
@@ -16,6 +17,14 @@ type Parser struct {
1617
helper.SeverityParser
1718
}
1819

20+
func (p *Parser) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
21+
var errs []error
22+
for i := range entries {
23+
errs = append(errs, p.Process(ctx, &entries[i]))
24+
}
25+
return errors.Join(errs...)
26+
}
27+
1928
// Process will parse severity from an entry.
2029
func (p *Parser) Process(ctx context.Context, entry *entry.Entry) error {
2130
return p.ProcessWith(ctx, entry, p.Parse)

pkg/stanza/operator/parser/syslog/parser.go

+9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package syslog // import "github.com/open-telemetry/opentelemetry-collector-cont
66
import (
77
"bytes"
88
"context"
9+
"errors"
910
"fmt"
1011
"regexp"
1112
"time"
@@ -36,6 +37,14 @@ type Parser struct {
3637
maxOctets int
3738
}
3839

40+
func (p *Parser) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
41+
var errs []error
42+
for i := range entries {
43+
errs = append(errs, p.Process(ctx, &entries[i]))
44+
}
45+
return errors.Join(errs...)
46+
}
47+
3948
// Process will parse an entry field as syslog.
4049
func (p *Parser) Process(ctx context.Context, entry *entry.Entry) error {
4150
// if pri header is missing and this is an expected behavior then facility and severity values should be skipped.

0 commit comments

Comments
 (0)