Skip to content

Commit ed302a3

Browse files
[receiver/filelog] fix record counting with header (#35870)
#### Description Fixes #35869 by refactoring of the `Reader::ReadToEnd` method. This refactors the `Reader::ReadToEnd` method by separating reading the file's header from reading the file's contents. This results in very similar code in `readHeader` and `readContents` methods, which was previously deduplicated at the cost of slightly higher complexity. The bug could be fixed without separating header reading from contents reading, but I hope this separation will make it easier to implement content batching in the Reader (#35455). Content batching was my original motivation for these code changes. I only discovered the problem with record counting when reading the code. #### Link to tracking issue Fixes #35869 #### Testing In the first commit I have added tests that document the erroneous behavior. In the second commit I have fixed the bug and corrected the tests. #### Documentation Added changelog entry.
1 parent 1e41cec commit ed302a3

File tree

4 files changed

+187
-55
lines changed

4 files changed

+187
-55
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: receiver/filelog
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: fix record counting with header
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35869]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

pkg/stanza/fileconsumer/config_test.go

-6
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/stretchr/testify/require"
1414
"go.opentelemetry.io/collector/component"
1515
"go.opentelemetry.io/collector/component/componenttest"
16-
"go.opentelemetry.io/collector/featuregate"
1716

1817
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest"
1918
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
@@ -724,11 +723,6 @@ func TestBuildWithSplitFunc(t *testing.T) {
724723
}
725724

726725
func TestBuildWithHeader(t *testing.T) {
727-
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true))
728-
t.Cleanup(func() {
729-
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false))
730-
})
731-
732726
basicConfig := func() *Config {
733727
cfg := NewConfig()
734728
cfg.Include = []string{"/var/log/testpath.*"}

pkg/stanza/fileconsumer/file_test.go

+75-15
Original file line numberDiff line numberDiff line change
@@ -1151,11 +1151,6 @@ func TestMaxBatching(t *testing.T) {
11511151
// TestReadExistingLogsWithHeader tests that, when starting from beginning, we
11521152
// read all the lines that are already there, and parses the headers
11531153
func TestReadExistingLogsWithHeader(t *testing.T) {
1154-
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true))
1155-
t.Cleanup(func() {
1156-
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false))
1157-
})
1158-
11591154
tempDir := t.TempDir()
11601155
cfg := NewConfig().includeDir(tempDir)
11611156
cfg.StartAt = "beginning"
@@ -1247,11 +1242,6 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {
12471242
}
12481243

12491244
func TestHeaderPersistance(t *testing.T) {
1250-
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true))
1251-
t.Cleanup(func() {
1252-
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false))
1253-
})
1254-
12551245
tempDir := t.TempDir()
12561246
cfg := NewConfig().includeDir(tempDir)
12571247
cfg.StartAt = "beginning"
@@ -1287,11 +1277,6 @@ func TestHeaderPersistance(t *testing.T) {
12871277
}
12881278

12891279
func TestHeaderPersistanceInHeader(t *testing.T) {
1290-
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), true))
1291-
t.Cleanup(func() {
1292-
require.NoError(t, featuregate.GlobalRegistry().Set(AllowHeaderMetadataParsing.ID(), false))
1293-
})
1294-
12951280
tempDir := t.TempDir()
12961281
cfg1 := NewConfig().includeDir(tempDir)
12971282
cfg1.StartAt = "beginning"
@@ -1598,3 +1583,78 @@ func TestReadGzipCompressedLogsFromEnd(t *testing.T) {
15981583
operator.poll(context.TODO())
15991584
sink.ExpectToken(t, []byte("testlog4"))
16001585
}
1586+
1587+
func TestIncludeFileRecordNumber(t *testing.T) {
1588+
t.Parallel()
1589+
1590+
tempDir := t.TempDir()
1591+
cfg := NewConfig().includeDir(tempDir)
1592+
cfg.StartAt = "beginning"
1593+
cfg.IncludeFileRecordNumber = true
1594+
operator, sink := testManager(t, cfg)
1595+
1596+
// Create a file, then start
1597+
temp := filetest.OpenTemp(t, tempDir)
1598+
filetest.WriteString(t, temp, "testlog1\n")
1599+
1600+
require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
1601+
defer func() {
1602+
require.NoError(t, operator.Stop())
1603+
}()
1604+
1605+
sink.ExpectCall(t, []byte("testlog1"), map[string]any{
1606+
attrs.LogFileName: filepath.Base(temp.Name()),
1607+
attrs.LogFileRecordNumber: int64(1),
1608+
})
1609+
}
1610+
1611+
func TestIncludeFileRecordNumberWithHeaderConfigured(t *testing.T) {
1612+
t.Parallel()
1613+
1614+
tempDir := t.TempDir()
1615+
cfg := NewConfig().includeDir(tempDir)
1616+
cfg.StartAt = "beginning"
1617+
cfg.IncludeFileRecordNumber = true
1618+
cfg = cfg.withHeader("^#", "(?P<header_attr>[A-z]+)")
1619+
operator, sink := testManager(t, cfg)
1620+
1621+
// Create a file, then start
1622+
temp := filetest.OpenTemp(t, tempDir)
1623+
filetest.WriteString(t, temp, "#abc\n#xyz: headerValue2\ntestlog1\n")
1624+
1625+
require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
1626+
defer func() {
1627+
require.NoError(t, operator.Stop())
1628+
}()
1629+
1630+
sink.ExpectCall(t, []byte("testlog1"), map[string]any{
1631+
attrs.LogFileName: filepath.Base(temp.Name()),
1632+
attrs.LogFileRecordNumber: int64(1),
1633+
"header_attr": "xyz",
1634+
})
1635+
}
1636+
1637+
func TestIncludeFileRecordNumberWithHeaderConfiguredButMissing(t *testing.T) {
1638+
t.Parallel()
1639+
1640+
tempDir := t.TempDir()
1641+
cfg := NewConfig().includeDir(tempDir)
1642+
cfg.StartAt = "beginning"
1643+
cfg.IncludeFileRecordNumber = true
1644+
cfg = cfg.withHeader("^#", "(?P<header_key>[A-z]+): (?P<header_value>[A-z]+)")
1645+
operator, sink := testManager(t, cfg)
1646+
1647+
// Create a file, then start
1648+
temp := filetest.OpenTemp(t, tempDir)
1649+
filetest.WriteString(t, temp, "testlog1\n")
1650+
1651+
require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
1652+
defer func() {
1653+
require.NoError(t, operator.Stop())
1654+
}()
1655+
1656+
sink.ExpectCall(t, []byte("testlog1"), map[string]any{
1657+
attrs.LogFileName: filepath.Base(temp.Name()),
1658+
attrs.LogFileRecordNumber: int64(1),
1659+
})
1660+
}

pkg/stanza/fileconsumer/internal/reader/reader.go

+85-34
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
7070
// SectionReader can only read a fixed window (from previous offset to EOF).
7171
info, err := r.file.Stat()
7272
if err != nil {
73-
r.set.Logger.Error("Failed to stat", zap.Error(err))
73+
r.set.Logger.Error("failed to stat", zap.Error(err))
7474
return
7575
}
7676
currentEOF := info.Size()
@@ -80,7 +80,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
8080
gzipReader, err := gzip.NewReader(io.NewSectionReader(r.file, r.Offset, currentEOF))
8181
if err != nil {
8282
if !errors.Is(err, io.EOF) {
83-
r.set.Logger.Error("Failed to create gzip reader", zap.Error(err))
83+
r.set.Logger.Error("failed to create gzip reader", zap.Error(err))
8484
}
8585
return
8686
} else {
@@ -96,7 +96,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
9696
}
9797

9898
if _, err := r.file.Seek(r.Offset, 0); err != nil {
99-
r.set.Logger.Error("Failed to seek", zap.Error(err))
99+
r.set.Logger.Error("failed to seek", zap.Error(err))
100100
return
101101
}
102102

@@ -106,9 +106,85 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
106106
}
107107
}()
108108

109+
if r.headerReader != nil {
110+
if r.readHeader(ctx) {
111+
return
112+
}
113+
}
114+
115+
r.readContents(ctx)
116+
}
117+
118+
func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) {
119+
s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.splitFunc)
120+
121+
// Read the tokens from the file until no more header tokens are found or the end of file is reached.
122+
for {
123+
select {
124+
case <-ctx.Done():
125+
return true
126+
default:
127+
}
128+
129+
ok := s.Scan()
130+
if !ok {
131+
if err := s.Error(); err != nil {
132+
r.set.Logger.Error("failed during header scan", zap.Error(err))
133+
} else {
134+
r.set.Logger.Debug("end of file reached", zap.Bool("delete_at_eof", r.deleteAtEOF))
135+
if r.deleteAtEOF {
136+
r.delete()
137+
}
138+
}
139+
// Either end of file was reached, or file cannot be scanned.
140+
return true
141+
}
142+
143+
token, err := r.decoder.Decode(s.Bytes())
144+
if err != nil {
145+
r.set.Logger.Error("failed to decode header token", zap.Error(err))
146+
r.Offset = s.Pos() // move past the bad token or we may be stuck
147+
continue
148+
}
149+
150+
err = r.headerReader.Process(ctx, token, r.FileAttributes)
151+
if err != nil {
152+
if errors.Is(err, header.ErrEndOfHeader) {
153+
// End of header reached.
154+
break
155+
}
156+
r.set.Logger.Error("failed to process header token", zap.Error(err))
157+
}
158+
159+
r.Offset = s.Pos()
160+
}
161+
162+
// Clean up the header machinery
163+
if err := r.headerReader.Stop(); err != nil {
164+
r.set.Logger.Error("failed to stop header pipeline during finalization", zap.Error(err))
165+
}
166+
r.headerReader = nil
167+
r.HeaderFinalized = true
168+
r.initialBufferSize = scanner.DefaultBufferSize
169+
170+
// Switch to the normal split and process functions.
171+
r.splitFunc = r.lineSplitFunc
172+
r.processFunc = r.emitFunc
173+
174+
// Reset position in file to r.Offest after the header scanner might have moved it past a content token.
175+
if _, err := r.file.Seek(r.Offset, 0); err != nil {
176+
r.set.Logger.Error("failed to seek post-header", zap.Error(err))
177+
return true
178+
}
179+
180+
return false
181+
}
182+
183+
func (r *Reader) readContents(ctx context.Context) {
184+
// Create the scanner to read the contents of the file.
109185
s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.splitFunc)
110186

111-
// Iterate over the tokenized file, emitting entries as we go
187+
// Iterate over the contents of the file.
112188
for {
113189
select {
114190
case <-ctx.Done():
@@ -119,7 +195,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
119195
ok := s.Scan()
120196
if !ok {
121197
if err := s.Error(); err != nil {
122-
r.set.Logger.Error("Failed during scan", zap.Error(err))
198+
r.set.Logger.Error("failed during scan", zap.Error(err))
123199
} else if r.deleteAtEOF {
124200
r.delete()
125201
}
@@ -128,7 +204,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
128204

129205
token, err := r.decoder.Decode(s.Bytes())
130206
if err != nil {
131-
r.set.Logger.Error("Failed to decode token", zap.Error(err))
207+
r.set.Logger.Error("failed to decode token", zap.Error(err))
132208
r.Offset = s.Pos() // move past the bad token or we may be stuck
133209
continue
134210
}
@@ -139,36 +215,11 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
139215
}
140216

141217
err = r.processFunc(ctx, token, r.FileAttributes)
142-
if err == nil {
143-
r.Offset = s.Pos() // successful emit, update offset
144-
continue
145-
}
146-
147-
if !errors.Is(err, header.ErrEndOfHeader) {
148-
r.set.Logger.Error("Failed to process token", zap.Error(err))
149-
r.Offset = s.Pos() // move past the bad token or we may be stuck
150-
continue
218+
if err != nil {
219+
r.set.Logger.Error("failed to process token", zap.Error(err))
151220
}
152221

153-
// Clean up the header machinery
154-
if err = r.headerReader.Stop(); err != nil {
155-
r.set.Logger.Error("Failed to stop header pipeline during finalization", zap.Error(err))
156-
}
157-
r.headerReader = nil
158-
r.HeaderFinalized = true
159-
160-
// Switch to the normal split and process functions.
161-
r.splitFunc = r.lineSplitFunc
162-
r.processFunc = r.emitFunc
163-
164-
// Recreate the scanner with the normal split func.
165-
// Do not use the updated offset from the old scanner, as the most recent token
166-
// could be split differently with the new splitter.
167-
if _, err = r.file.Seek(r.Offset, 0); err != nil {
168-
r.set.Logger.Error("Failed to seek post-header", zap.Error(err))
169-
return
170-
}
171-
s = scanner.New(r, r.maxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc)
222+
r.Offset = s.Pos()
172223
}
173224
}
174225

0 commit comments

Comments
 (0)