@@ -57,7 +57,6 @@ type InputOperator struct {
57
57
encoding encoding.Encoding
58
58
59
59
wg sync.WaitGroup
60
- readerWg sync.WaitGroup
61
60
firstCheck bool
62
61
cancel context.CancelFunc
63
62
}
@@ -111,25 +110,26 @@ func (f *InputOperator) startPoller(ctx context.Context) {
111
110
112
111
// poll checks all the watched paths for new entries
113
112
func (f * InputOperator ) poll (ctx context.Context ) {
114
-
115
113
var matches []string
116
114
if len (f .queuedMatches ) > f .MaxConcurrentFiles {
117
115
matches , f .queuedMatches = f .queuedMatches [:f .MaxConcurrentFiles ], f .queuedMatches [f .MaxConcurrentFiles :]
118
- } else if len (f .queuedMatches ) > 0 {
119
- matches , f .queuedMatches = f .queuedMatches , make ([]string , 0 )
120
116
} else {
121
- // Increment the generation on all known readers
122
- // This is done here because the next generation is about to start
123
- for i := 0 ; i < len (f .knownFiles ); i ++ {
124
- f .knownFiles [i ].generation ++
125
- }
117
+ if len (f .queuedMatches ) > 0 {
118
+ matches , f .queuedMatches = f .queuedMatches , make ([]string , 0 )
119
+ } else {
120
+ // Increment the generation on all known readers
121
+ // This is done here because the next generation is about to start
122
+ for i := 0 ; i < len (f .knownFiles ); i ++ {
123
+ f .knownFiles [i ].generation ++
124
+ }
126
125
127
- // Get the list of paths on disk
128
- matches = getMatches (f .Include , f .Exclude )
129
- if f .firstCheck && len (matches ) == 0 {
130
- f .Warnw ("no files match the configured include patterns" , "include" , f .Include )
131
- } else if len (matches ) > f .MaxConcurrentFiles {
132
- matches , f .queuedMatches = matches [:f .MaxConcurrentFiles ], matches [f .MaxConcurrentFiles :]
126
+ // Get the list of paths on disk
127
+ matches = getMatches (f .Include , f .Exclude )
128
+ if f .firstCheck && len (matches ) == 0 {
129
+ f .Warnw ("no files match the configured include patterns" , "include" , f .Include )
130
+ } else if len (matches ) > f .MaxConcurrentFiles {
131
+ matches , f .queuedMatches = matches [:f .MaxConcurrentFiles ], matches [f .MaxConcurrentFiles :]
132
+ }
133
133
}
134
134
}
135
135
@@ -229,7 +229,6 @@ OUTER:
229
229
// Empty file, don't read it until we can compare its fingerprint
230
230
fps = append (fps [:i ], fps [i + 1 :]... )
231
231
filesCopy = append (filesCopy [:i ], filesCopy [i + 1 :]... )
232
-
233
232
}
234
233
235
234
for j := 0 ; j < len (fps ); j ++ {
@@ -267,9 +266,7 @@ OUTER:
267
266
// before clearing out readers that have existed for 3 generations.
268
267
func (f * InputOperator ) saveCurrent (readers []* Reader ) {
269
268
// Add readers from the current, completed poll interval to the list of known files
270
- for _ , reader := range readers {
271
- f .knownFiles = append (f .knownFiles , reader )
272
- }
269
+ f .knownFiles = append (f .knownFiles , readers ... )
273
270
274
271
// Clear out old readers. They are sorted such that they are oldest first,
275
272
// so we can just find the first reader whose generation is less than our
0 commit comments