Skip to content

Commit a571a49

Browse files
fix: fix record counting with header
This refactors the `Reader::ReadToEnd` method by separating reading the file's header from reading the file's contents. This results in very similar code in `readHeader` and `readContents` methods, which was previosly deduplicated at the cost of slightly higher complexity. The bug could be fixed without separating header reading from contents reading, but I hope this separation will make it easier to implement content batching in the Reader (open-telemetry#35455). Content batching was my original motivation for these code changes. I only discovered the problem with record counting when reading the code.
1 parent 91c936a commit a571a49

File tree

2 files changed

+92
-40
lines changed

2 files changed

+92
-40
lines changed

pkg/stanza/fileconsumer/file_test.go

+4-8
Original file line numberDiff line numberDiff line change
@@ -1643,10 +1643,8 @@ func TestIncludeFileRecordNumberWithHeaderConfigured(t *testing.T) {
16431643
}()
16441644

16451645
sink.ExpectCall(t, []byte("testlog1"), map[string]any{
1646-
attrs.LogFileName: filepath.Base(temp.Name()),
1647-
// The record number should be 1, but it's actually 4 = 1 + 2 header lines + 1
1648-
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35869
1649-
attrs.LogFileRecordNumber: int64(4),
1646+
attrs.LogFileName: filepath.Base(temp.Name()),
1647+
attrs.LogFileRecordNumber: int64(1),
16501648
"header_attr": "xyz",
16511649
})
16521650
}
@@ -1671,9 +1669,7 @@ func TestIncludeFileRecordNumberWithHeaderConfiguredButMissing(t *testing.T) {
16711669
}()
16721670

16731671
sink.ExpectCall(t, []byte("testlog1"), map[string]any{
1674-
attrs.LogFileName: filepath.Base(temp.Name()),
1675-
// The record number should be 1, but it's actually 2 = 1 + 1
1676-
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35869
1677-
attrs.LogFileRecordNumber: int64(2),
1672+
attrs.LogFileName: filepath.Base(temp.Name()),
1673+
attrs.LogFileRecordNumber: int64(1),
16781674
})
16791675
}

pkg/stanza/fileconsumer/internal/reader/reader.go

+88-32
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
7070
// SectionReader can only read a fixed window (from previous offset to EOF).
7171
info, err := r.file.Stat()
7272
if err != nil {
73-
r.set.Logger.Error("Failed to stat", zap.Error(err))
73+
r.set.Logger.Error("failed to stat", zap.Error(err))
7474
return
7575
}
7676
currentEOF := info.Size()
@@ -80,7 +80,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
8080
gzipReader, err := gzip.NewReader(io.NewSectionReader(r.file, r.Offset, currentEOF))
8181
if err != nil {
8282
if !errors.Is(err, io.EOF) {
83-
r.set.Logger.Error("Failed to create gzip reader", zap.Error(err))
83+
r.set.Logger.Error("failed to create gzip reader", zap.Error(err))
8484
}
8585
return
8686
} else {
@@ -96,7 +96,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
9696
}
9797

9898
if _, err := r.file.Seek(r.Offset, 0); err != nil {
99-
r.set.Logger.Error("Failed to seek", zap.Error(err))
99+
r.set.Logger.Error("failed to seek", zap.Error(err))
100100
return
101101
}
102102

@@ -106,9 +106,90 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
106106
}
107107
}()
108108

109+
doneReadingFile := r.readHeader(ctx)
110+
if doneReadingFile {
111+
return
112+
}
113+
114+
// Reset position in file to r.Offest after the header scanner might have moved it past a content token.
115+
if _, err := r.file.Seek(r.Offset, 0); err != nil {
116+
r.set.Logger.Error("failed to seek post-header", zap.Error(err))
117+
return
118+
}
119+
120+
r.readContents(ctx)
121+
}
122+
123+
func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) {
124+
if r.headerReader == nil {
125+
r.set.Logger.Debug("no need to read header", zap.Bool("header_finalized", r.HeaderFinalized))
126+
return false
127+
}
128+
109129
s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.splitFunc)
110130

111-
// Iterate over the tokenized file, emitting entries as we go
131+
// Read the tokens from the file until no more header tokens are found or the end of file is reached.
132+
for {
133+
select {
134+
case <-ctx.Done():
135+
return true
136+
default:
137+
}
138+
139+
ok := s.Scan()
140+
if !ok {
141+
if err := s.Error(); err != nil {
142+
r.set.Logger.Error("failed during header scan", zap.Error(err))
143+
} else {
144+
r.set.Logger.Debug("end of file reached", zap.Bool("delete_at_eof", r.deleteAtEOF))
145+
if r.deleteAtEOF {
146+
r.delete()
147+
}
148+
}
149+
// Either end of file was reached, or file cannot be scanned.
150+
return true
151+
}
152+
153+
token, err := r.decoder.Decode(s.Bytes())
154+
if err != nil {
155+
r.set.Logger.Error("decode header: %w", zap.Error(err))
156+
r.Offset = s.Pos() // move past the bad token or we may be stuck
157+
continue
158+
}
159+
160+
err = r.headerReader.Process(ctx, token, r.FileAttributes)
161+
if err != nil {
162+
if errors.Is(err, header.ErrEndOfHeader) {
163+
// End of header reached.
164+
break
165+
} else {
166+
r.set.Logger.Error("process header: %w", zap.Error(err))
167+
}
168+
}
169+
170+
r.Offset = s.Pos()
171+
}
172+
173+
// Clean up the header machinery
174+
if err := r.headerReader.Stop(); err != nil {
175+
r.set.Logger.Error("failed to stop header pipeline during finalization", zap.Error(err))
176+
}
177+
r.headerReader = nil
178+
r.HeaderFinalized = true
179+
r.initialBufferSize = scanner.DefaultBufferSize
180+
181+
// Switch to the normal split and process functions.
182+
r.splitFunc = r.lineSplitFunc
183+
r.processFunc = r.emitFunc
184+
185+
return false
186+
}
187+
188+
func (r *Reader) readContents(ctx context.Context) {
189+
// Create the scanner to read the contents of the file.
190+
s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.splitFunc)
191+
192+
// Iterate over the contents of the file.
112193
for {
113194
select {
114195
case <-ctx.Done():
@@ -119,7 +200,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
119200
ok := s.Scan()
120201
if !ok {
121202
if err := s.Error(); err != nil {
122-
r.set.Logger.Error("Failed during scan", zap.Error(err))
203+
r.set.Logger.Error("failed during scan", zap.Error(err))
123204
} else if r.deleteAtEOF {
124205
r.delete()
125206
}
@@ -139,36 +220,11 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
139220
}
140221

141222
err = r.processFunc(ctx, token, r.FileAttributes)
142-
if err == nil {
143-
r.Offset = s.Pos() // successful emit, update offset
144-
continue
145-
}
146-
147-
if !errors.Is(err, header.ErrEndOfHeader) {
223+
if err != nil {
148224
r.set.Logger.Error("process: %w", zap.Error(err))
149-
r.Offset = s.Pos() // move past the bad token or we may be stuck
150-
continue
151225
}
152226

153-
// Clean up the header machinery
154-
if err = r.headerReader.Stop(); err != nil {
155-
r.set.Logger.Error("Failed to stop header pipeline during finalization", zap.Error(err))
156-
}
157-
r.headerReader = nil
158-
r.HeaderFinalized = true
159-
160-
// Switch to the normal split and process functions.
161-
r.splitFunc = r.lineSplitFunc
162-
r.processFunc = r.emitFunc
163-
164-
// Recreate the scanner with the normal split func.
165-
// Do not use the updated offset from the old scanner, as the most recent token
166-
// could be split differently with the new splitter.
167-
if _, err = r.file.Seek(r.Offset, 0); err != nil {
168-
r.set.Logger.Error("Failed to seek post-header", zap.Error(err))
169-
return
170-
}
171-
s = scanner.New(r, r.maxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc)
227+
r.Offset = s.Pos()
172228
}
173229
}
174230

0 commit comments

Comments
 (0)