Skip to content

Commit a584e0a

Browse files
refactor: introduce batching in Reader, but call emit function for each entry still
Next step is to change the emit function to accept a batch.
1 parent 2a3fbd0 commit a584e0a

File tree

2 files changed

+47
-6
lines changed

2 files changed

+47
-6
lines changed

pkg/stanza/fileconsumer/internal/reader/factory.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ import (
2424
)
2525

2626
const (
27-
DefaultMaxLogSize = 1024 * 1024
28-
DefaultFlushPeriod = 500 * time.Millisecond
27+
DefaultMaxLogSize = 1024 * 1024
28+
DefaultFlushPeriod = 500 * time.Millisecond
29+
DefaultMaxBatchSize = 100
2930
)
3031

3132
type Factory struct {
@@ -78,6 +79,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
7879
includeFileRecordNum: f.IncludeFileRecordNumber,
7980
compression: f.Compression,
8081
acquireFSLock: f.AcquireFSLock,
82+
maxBatchSize: DefaultMaxBatchSize,
8183
}
8284
r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName))
8385

pkg/stanza/fileconsumer/internal/reader/reader.go

+43-4
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type Reader struct {
5252
includeFileRecordNum bool
5353
compression string
5454
acquireFSLock bool
55+
maxBatchSize int
5556
}
5657

5758
// ReadToEnd will read until the end of the file
@@ -179,6 +180,8 @@ func (r *Reader) readContents(ctx context.Context) {
179180
// Create the scanner to read the contents of the file.
180181
s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.contentSplitFunc)
181182

183+
tokens := make([]emit.Token, 0, r.maxBatchSize)
184+
182185
// Iterate over the contents of the file.
183186
for {
184187
select {
@@ -194,7 +197,7 @@ func (r *Reader) readContents(ctx context.Context) {
194197
} else if r.deleteAtEOF {
195198
r.delete()
196199
}
197-
return
200+
break
198201
}
199202

200203
token, err := r.decoder.Decode(s.Bytes())
@@ -209,15 +212,51 @@ func (r *Reader) readContents(ctx context.Context) {
209212
r.FileAttributes[attrs.LogFileRecordNumber] = r.RecordNum
210213
}
211214

212-
err = r.emitFunc(ctx, emit.NewToken(token, r.FileAttributes))
213-
if err != nil {
214-
r.set.Logger.Error("failed to process token", zap.Error(err))
215+
tokens = append(tokens, emit.NewToken(copyBody(token), copyAttributes(r.FileAttributes)))
216+
217+
if r.maxBatchSize > 0 && len(tokens) >= r.maxBatchSize {
218+
for _, t := range tokens {
219+
err := r.emitFunc(ctx, t)
220+
if err != nil {
221+
r.set.Logger.Error("failed to process token", zap.Error(err))
222+
}
223+
}
224+
tokens = tokens[:0]
225+
r.Offset = s.Pos()
215226
}
227+
}
216228

229+
if len(tokens) > 0 {
230+
for _, t := range tokens {
231+
err := r.emitFunc(ctx, t)
232+
if err != nil {
233+
r.set.Logger.Error("failed to process token", zap.Error(err))
234+
}
235+
}
217236
r.Offset = s.Pos()
218237
}
219238
}
220239

240+
func copyBody(body []byte) []byte {
241+
if body == nil {
242+
return nil
243+
}
244+
copied := make([]byte, len(body))
245+
copy(copied, body)
246+
return copied
247+
}
248+
249+
func copyAttributes(attrs map[string]any) map[string]any {
250+
if attrs == nil {
251+
return nil
252+
}
253+
copied := make(map[string]any, len(attrs))
254+
for k, v := range attrs {
255+
copied[k] = v
256+
}
257+
return copied
258+
}
259+
221260
// Delete will close and delete the file
222261
func (r *Reader) delete() {
223262
r.close()

0 commit comments

Comments
 (0)