@@ -5,10 +5,12 @@ package fileconsumer
5
5
6
6
import (
7
7
"context"
8
+ "fmt"
8
9
"os"
9
10
"path/filepath"
10
11
"sync"
11
12
"testing"
13
+ "time"
12
14
13
15
"github.com/stretchr/testify/require"
14
16
@@ -23,22 +25,6 @@ type fileInputBenchmark struct {
23
25
config func () * Config
24
26
}
25
27
26
- type benchFile struct {
27
- * os.File
28
- log func (int )
29
- }
30
-
31
- func simpleTextFile (b * testing.B , file * os.File ) * benchFile {
32
- line := string (filetest .TokenWithLength (49 )) + "\n "
33
- return & benchFile {
34
- File : file ,
35
- log : func (_ int ) {
36
- _ , err := file .WriteString (line )
37
- require .NoError (b , err )
38
- },
39
- }
40
- }
41
-
42
28
func BenchmarkFileInput (b * testing.B ) {
43
29
cases := []fileInputBenchmark {
44
30
{
@@ -144,63 +130,98 @@ func BenchmarkFileInput(b *testing.B) {
144
130
return cfg
145
131
},
146
132
},
133
+ {
134
+ name : "Many" ,
135
+ paths : func () []string {
136
+ paths := make ([]string , 100 )
137
+ for i := range paths {
138
+ paths [i ] = fmt .Sprintf ("file%d.log" , i )
139
+ }
140
+ return paths
141
+ }(),
142
+ config : func () * Config {
143
+ cfg := NewConfig ()
144
+ cfg .Include = []string {"file*.log" }
145
+ cfg .MaxConcurrentFiles = 100
146
+ return cfg
147
+ },
148
+ },
149
+ }
150
+
151
+ // Pregenerate some lines which we can write to the files
152
+ // to avoid measuring the time it takes to generate them
153
+ // and to reduce the amount of syscalls in the benchmark.
154
+ uniqueLines := 10
155
+ severalLines := ""
156
+ for i := 0 ; i < uniqueLines ; i ++ {
157
+ severalLines += string (filetest .TokenWithLength (999 )) + "\n "
147
158
}
148
159
149
160
for _ , bench := range cases {
150
161
b .Run (bench .name , func (b * testing.B ) {
151
162
rootDir := b .TempDir ()
152
163
153
- var files []* benchFile
164
+ var files []* os. File
154
165
for _ , path := range bench .paths {
155
- file := filetest .OpenFile (b , filepath .Join (rootDir , path ))
156
- files = append (files , simpleTextFile (b , file ))
166
+ f := filetest .OpenFile (b , filepath .Join (rootDir , path ))
167
+ // Initialize the file to ensure a unique fingerprint
168
+ _ , err := f .WriteString (f .Name () + "\n " )
169
+ require .NoError (b , err )
170
+ // Write half the content before starting the benchmark
171
+ for i := 0 ; i < b .N / 2 ; i ++ {
172
+ _ , err := f .WriteString (severalLines )
173
+ require .NoError (b , err )
174
+ }
175
+ require .NoError (b , f .Sync ())
176
+ files = append (files , f )
157
177
}
158
178
159
179
cfg := bench .config ()
160
180
for i , inc := range cfg .Include {
161
181
cfg .Include [i ] = filepath .Join (rootDir , inc )
162
182
}
163
183
cfg .StartAt = "beginning"
184
+ // Use aggressive poll interval so we're not measuring excess sleep time
185
+ cfg .PollInterval = time .Microsecond
164
186
165
- received := make (chan [] byte )
187
+ doneChan := make (chan bool , len ( files ) )
166
188
callback := func (_ context.Context , token []byte , _ map [string ]any ) error {
167
- received <- token
189
+ if len (token ) == 0 {
190
+ doneChan <- true
191
+ }
168
192
return nil
169
193
}
170
194
op , err := cfg .Build (testutil .Logger (b ), callback )
171
195
require .NoError (b , err )
172
196
173
- // write half the lines before starting
174
- mid := b .N / 2
175
- for i := 0 ; i < mid ; i ++ {
176
- for _ , file := range files {
177
- file .log (i )
178
- }
179
- }
180
-
181
197
b .ResetTimer ()
182
- err = op .Start (testutil .NewUnscopedMockPersister ())
198
+ require . NoError ( b , op .Start (testutil .NewUnscopedMockPersister () ))
183
199
defer func () {
184
200
require .NoError (b , op .Stop ())
185
201
}()
186
- require .NoError (b , err )
187
202
188
- // write the remainder of lines while running
189
203
var wg sync.WaitGroup
190
- wg .Add (1 )
191
- go func () {
192
- for i := mid ; i < b .N ; i ++ {
193
- for _ , file := range files {
194
- file .log (i )
204
+ for _ , file := range files {
205
+ wg .Add (1 )
206
+ go func (f * os.File ) {
207
+ defer wg .Done ()
208
+ // Write the other half of the content while running
209
+ for i := 0 ; i < b .N / 2 ; i ++ {
210
+ _ , err := f .WriteString (severalLines )
211
+ require .NoError (b , err )
195
212
}
196
- }
197
- wg .Done ()
198
- }()
199
- wg .Wait ()
213
+ // Signal end of file
214
+ _ , err := f .WriteString ("\n " )
215
+ require .NoError (b , err )
216
+ require .NoError (b , f .Sync ())
217
+ }(file )
218
+ }
200
219
201
- for i := 0 ; i < b .N * len (files ); i ++ {
202
- <- received
220
+ // Timer continues to run until all files have been read
221
+ for dones := 0 ; dones < len (files ); dones ++ {
222
+ <- doneChan
203
223
}
224
+ wg .Wait ()
204
225
})
205
226
}
206
227
}
0 commit comments