Skip to content

Commit c206995

Browse files
change emit.Callback signature to accept a slice of tokens
But still emit each Entry individually into the Stanza pipeline
1 parent 6b4c9fe commit c206995

File tree

10 files changed

+84
-60
lines changed

10 files changed

+84
-60
lines changed

pkg/stanza/fileconsumer/benchmark_test.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,12 @@ func BenchmarkFileInput(b *testing.B) {
188188
cfg.PollInterval = time.Microsecond
189189

190190
doneChan := make(chan bool, len(files))
191-
callback := func(_ context.Context, token emit.Token) error {
192-
if len(token.Body) == 0 {
193-
doneChan <- true
191+
callback := func(_ context.Context, tokens []emit.Token) error {
192+
for _, token := range tokens {
193+
if len(token.Body) == 0 {
194+
doneChan <- true
195+
break
196+
}
194197
}
195198
return nil
196199
}

pkg/stanza/fileconsumer/emit/emit.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"context"
88
)
99

10-
type Callback func(ctx context.Context, token Token) error
10+
type Callback func(ctx context.Context, tokens []Token) error
1111

1212
type Token struct {
1313
Body []byte

pkg/stanza/fileconsumer/internal/emittest/nop.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ import (
99
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
1010
)
1111

12-
func Nop(_ context.Context, _ emit.Token) error {
12+
func Nop(_ context.Context, _ []emit.Token) error {
1313
return nil
1414
}

pkg/stanza/fileconsumer/internal/emittest/nop_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ import (
1313
)
1414

1515
func TestNop(t *testing.T) {
16-
require.NoError(t, Nop(context.Background(), emit.Token{}))
16+
require.NoError(t, Nop(context.Background(), []emit.Token{}))
1717
}

pkg/stanza/fileconsumer/internal/emittest/sink.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,15 @@ func NewSink(opts ...SinkOpt) *Sink {
5757
return &Sink{
5858
emitChan: emitChan,
5959
timeout: cfg.timeout,
60-
Callback: func(ctx context.Context, token emit.Token) error {
61-
copied := make([]byte, len(token.Body))
62-
copy(copied, token.Body)
63-
select {
64-
case <-ctx.Done():
65-
return ctx.Err()
66-
case emitChan <- &Call{copied, token.Attributes}:
60+
Callback: func(ctx context.Context, tokens []emit.Token) error {
61+
for _, token := range tokens {
62+
copied := make([]byte, len(token.Body))
63+
copy(copied, token.Body)
64+
select {
65+
case <-ctx.Done():
66+
return ctx.Err()
67+
case emitChan <- &Call{copied, token.Attributes}:
68+
}
6769
}
6870
return nil
6971
},

pkg/stanza/fileconsumer/internal/emittest/sink_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*Call) {
204204
}
205205
go func() {
206206
for _, c := range testCalls {
207-
assert.NoError(t, s.Callback(context.Background(), emit.NewToken(c.Token, c.Attrs)))
207+
assert.NoError(t, s.Callback(context.Background(), []emit.Token{emit.NewToken(c.Token, c.Attrs)}))
208208
}
209209
}()
210210
return s, testCalls

pkg/stanza/fileconsumer/internal/reader/reader.go

+6-10
Original file line numberDiff line numberDiff line change
@@ -215,23 +215,19 @@ func (r *Reader) readContents(ctx context.Context) {
215215
tokens = append(tokens, emit.NewToken(copyBody(token), copyAttributes(r.FileAttributes)))
216216

217217
if r.maxBatchSize > 0 && len(tokens) >= r.maxBatchSize {
218-
for _, t := range tokens {
219-
err := r.emitFunc(ctx, t)
220-
if err != nil {
221-
r.set.Logger.Error("failed to emit token", zap.Error(err))
222-
}
218+
err := r.emitFunc(ctx, tokens)
219+
if err != nil {
220+
r.set.Logger.Error("failed to emit token", zap.Error(err))
223221
}
224222
tokens = tokens[:0]
225223
r.Offset = s.Pos()
226224
}
227225
}
228226

229227
if len(tokens) > 0 {
230-
for _, t := range tokens {
231-
err := r.emitFunc(ctx, t)
232-
if err != nil {
233-
r.set.Logger.Error("failed to emit token", zap.Error(err))
234-
}
228+
err := r.emitFunc(ctx, tokens)
229+
if err != nil {
230+
r.set.Logger.Error("failed to emit token", zap.Error(err))
235231
}
236232
r.Offset = s.Pos()
237233
}

pkg/stanza/operator/input/file/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error
6060
toBody: toBody,
6161
}
6262

63-
input.fileConsumer, err = c.Config.Build(set, input.emit)
63+
input.fileConsumer, err = c.Config.Build(set, input.emitBatch)
6464
if err != nil {
6565
return nil, err
6666
}

pkg/stanza/operator/input/file/input.go

+15
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package file // import "github.com/open-telemetry/opentelemetry-collector-contri
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
910

1011
"go.uber.org/zap"
@@ -37,6 +38,20 @@ func (i *Input) Stop() error {
3738
return i.fileConsumer.Stop()
3839
}
3940

41+
func (i *Input) emitBatch(ctx context.Context, tokens []emit.Token) error {
42+
var errs []error
43+
for _, token := range tokens {
44+
err := i.emit(ctx, token)
45+
if err != nil {
46+
errs = append(errs, err)
47+
}
48+
}
49+
if len(errs) > 0 {
50+
return errors.Join(errs...)
51+
}
52+
return nil
53+
}
54+
4055
func (i *Input) emit(ctx context.Context, token emit.Token) error {
4156
if len(token.Body) == 0 {
4257
return nil

receiver/otlpjsonfilereceiver/file.go

+43-35
Original file line numberDiff line numberDiff line change
@@ -83,18 +83,20 @@ func createLogsReceiver(_ context.Context, settings receiver.Settings, configura
8383
if cfg.ReplayFile {
8484
opts = append(opts, fileconsumer.WithNoTracking())
8585
}
86-
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error {
87-
ctx = obsrecv.StartLogsOp(ctx)
88-
var l plog.Logs
89-
l, err = logsUnmarshaler.UnmarshalLogs(token.Body)
90-
if err != nil {
91-
obsrecv.EndLogsOp(ctx, metadata.Type.String(), 0, err)
92-
} else {
93-
logRecordCount := l.LogRecordCount()
94-
if logRecordCount != 0 {
95-
err = logs.ConsumeLogs(ctx, l)
86+
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, tokens []emit.Token) error {
87+
for _, token := range tokens {
88+
ctx = obsrecv.StartLogsOp(ctx)
89+
var l plog.Logs
90+
l, err = logsUnmarshaler.UnmarshalLogs(token.Body)
91+
if err != nil {
92+
obsrecv.EndLogsOp(ctx, metadata.Type.String(), 0, err)
93+
} else {
94+
logRecordCount := l.LogRecordCount()
95+
if logRecordCount != 0 {
96+
err = logs.ConsumeLogs(ctx, l)
97+
}
98+
obsrecv.EndLogsOp(ctx, metadata.Type.String(), logRecordCount, err)
9699
}
97-
obsrecv.EndLogsOp(ctx, metadata.Type.String(), logRecordCount, err)
98100
}
99101
return nil
100102
}, opts...)
@@ -120,17 +122,19 @@ func createMetricsReceiver(_ context.Context, settings receiver.Settings, config
120122
if cfg.ReplayFile {
121123
opts = append(opts, fileconsumer.WithNoTracking())
122124
}
123-
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error {
124-
ctx = obsrecv.StartMetricsOp(ctx)
125-
var m pmetric.Metrics
126-
m, err = metricsUnmarshaler.UnmarshalMetrics(token.Body)
127-
if err != nil {
128-
obsrecv.EndMetricsOp(ctx, metadata.Type.String(), 0, err)
129-
} else {
130-
if m.ResourceMetrics().Len() != 0 {
131-
err = metrics.ConsumeMetrics(ctx, m)
125+
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, tokens []emit.Token) error {
126+
for _, token := range tokens {
127+
ctx = obsrecv.StartMetricsOp(ctx)
128+
var m pmetric.Metrics
129+
m, err = metricsUnmarshaler.UnmarshalMetrics(token.Body)
130+
if err != nil {
131+
obsrecv.EndMetricsOp(ctx, metadata.Type.String(), 0, err)
132+
} else {
133+
if m.ResourceMetrics().Len() != 0 {
134+
err = metrics.ConsumeMetrics(ctx, m)
135+
}
136+
obsrecv.EndMetricsOp(ctx, metadata.Type.String(), m.MetricCount(), err)
132137
}
133-
obsrecv.EndMetricsOp(ctx, metadata.Type.String(), m.MetricCount(), err)
134138
}
135139
return nil
136140
}, opts...)
@@ -156,17 +160,19 @@ func createTracesReceiver(_ context.Context, settings receiver.Settings, configu
156160
if cfg.ReplayFile {
157161
opts = append(opts, fileconsumer.WithNoTracking())
158162
}
159-
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error {
160-
ctx = obsrecv.StartTracesOp(ctx)
161-
var t ptrace.Traces
162-
t, err = tracesUnmarshaler.UnmarshalTraces(token.Body)
163-
if err != nil {
164-
obsrecv.EndTracesOp(ctx, metadata.Type.String(), 0, err)
165-
} else {
166-
if t.ResourceSpans().Len() != 0 {
167-
err = traces.ConsumeTraces(ctx, t)
163+
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, tokens []emit.Token) error {
164+
for _, token := range tokens {
165+
ctx = obsrecv.StartTracesOp(ctx)
166+
var t ptrace.Traces
167+
t, err = tracesUnmarshaler.UnmarshalTraces(token.Body)
168+
if err != nil {
169+
obsrecv.EndTracesOp(ctx, metadata.Type.String(), 0, err)
170+
} else {
171+
if t.ResourceSpans().Len() != 0 {
172+
err = traces.ConsumeTraces(ctx, t)
173+
}
174+
obsrecv.EndTracesOp(ctx, metadata.Type.String(), t.SpanCount(), err)
168175
}
169-
obsrecv.EndTracesOp(ctx, metadata.Type.String(), t.SpanCount(), err)
170176
}
171177
return nil
172178
}, opts...)
@@ -184,10 +190,12 @@ func createProfilesReceiver(_ context.Context, settings receiver.Settings, confi
184190
if cfg.ReplayFile {
185191
opts = append(opts, fileconsumer.WithNoTracking())
186192
}
187-
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, token emit.Token) error {
188-
p, _ := profilesUnmarshaler.UnmarshalProfiles(token.Body)
189-
if p.ResourceProfiles().Len() != 0 {
190-
_ = profiles.ConsumeProfiles(ctx, p)
193+
input, err := cfg.Config.Build(settings.TelemetrySettings, func(ctx context.Context, tokens []emit.Token) error {
194+
for _, token := range tokens {
195+
p, _ := profilesUnmarshaler.UnmarshalProfiles(token.Body)
196+
if p.ResourceProfiles().Len() != 0 {
197+
_ = profiles.ConsumeProfiles(ctx, p)
198+
}
191199
}
192200
return nil
193201
}, opts...)

0 commit comments

Comments
 (0)