5
5
"fmt"
6
6
"net"
7
7
"strings"
8
+ "sync"
8
9
"time"
9
10
10
11
"github.com/cenkalti/backoff"
@@ -143,6 +144,9 @@ type client struct {
143
144
ipv4conn * ipv4.PacketConn
144
145
ipv6conn * ipv6.PacketConn
145
146
ifaces []net.Interface
147
+
148
+ mutex sync.Mutex
149
+ sentEntries map [string ]* ServiceEntry
146
150
}
147
151
148
152
// Client structure constructor
@@ -177,6 +181,28 @@ func newClient(opts clientOpts) (*client, error) {
177
181
}, nil
178
182
}
179
183
184
+ var cleanupFreq = 10 * time .Second
185
+
186
+ // clean up entries whose TTL expired
187
+ func (c * client ) cleanupSentEntries (ctx context.Context ) {
188
+ ticker := time .NewTicker (cleanupFreq )
189
+ defer ticker .Stop ()
190
+ for {
191
+ select {
192
+ case t := <- ticker .C :
193
+ c .mutex .Lock ()
194
+ for k , e := range c .sentEntries {
195
+ if t .After (e .Expiry ) {
196
+ delete (c .sentEntries , k )
197
+ }
198
+ }
199
+ c .mutex .Unlock ()
200
+ case <- ctx .Done ():
201
+ return
202
+ }
203
+ }
204
+ }
205
+
180
206
// Start listeners and waits for the shutdown signal from exit channel
181
207
func (c * client ) mainloop (ctx context.Context , params * lookupParams ) {
182
208
// start listening for responses
@@ -189,16 +215,20 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) {
189
215
}
190
216
191
217
// Iterate through channels from listeners goroutines
192
- var entries , sentEntries map [string ]* ServiceEntry
193
- sentEntries = make (map [string ]* ServiceEntry )
218
+ var entries map [string ]* ServiceEntry
219
+ c .sentEntries = make (map [string ]* ServiceEntry )
220
+ go c .cleanupSentEntries (ctx )
221
+
194
222
for {
223
+ var now time.Time
195
224
select {
196
225
case <- ctx .Done ():
197
226
// Context expired. Notify subscriber that we are done here.
198
227
params .done ()
199
228
c .shutdown ()
200
229
return
201
230
case msg := <- msgCh :
231
+ now = time .Now ()
202
232
entries = make (map [string ]* ServiceEntry )
203
233
sections := append (msg .Answer , msg .Ns ... )
204
234
sections = append (sections , msg .Extra ... )
@@ -218,7 +248,7 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) {
218
248
params .Service ,
219
249
params .Domain )
220
250
}
221
- entries [rr .Ptr ].TTL = rr .Hdr .Ttl
251
+ entries [rr .Ptr ].Expiry = now . Add ( time . Duration ( rr .Hdr .Ttl ) * time . Second )
222
252
case * dns.SRV :
223
253
if params .ServiceInstanceName () != "" && params .ServiceInstanceName () != rr .Hdr .Name {
224
254
continue
@@ -233,7 +263,7 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) {
233
263
}
234
264
entries [rr .Hdr .Name ].HostName = rr .Target
235
265
entries [rr .Hdr .Name ].Port = int (rr .Port )
236
- entries [rr .Hdr .Name ].TTL = rr .Hdr .Ttl
266
+ entries [rr .Hdr .Name ].Expiry = now . Add ( time . Duration ( rr .Hdr .Ttl ) * time . Second )
237
267
case * dns.TXT :
238
268
if params .ServiceInstanceName () != "" && params .ServiceInstanceName () != rr .Hdr .Name {
239
269
continue
@@ -247,7 +277,7 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) {
247
277
params .Domain )
248
278
}
249
279
entries [rr .Hdr .Name ].Text = rr .Txt
250
- entries [rr .Hdr .Name ].TTL = rr .Hdr .Ttl
280
+ entries [rr .Hdr .Name ].Expiry = now . Add ( time . Duration ( rr .Hdr .Ttl ) * time . Second )
251
281
}
252
282
}
253
283
// Associate IPs in a second round as other fields should be filled by now.
@@ -271,12 +301,15 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) {
271
301
272
302
if len (entries ) > 0 {
273
303
for k , e := range entries {
274
- if e .TTL == 0 {
304
+ c .mutex .Lock ()
305
+ if ! e .Expiry .After (now ) {
275
306
delete (entries , k )
276
- delete (sentEntries , k )
307
+ delete (c .sentEntries , k )
308
+ c .mutex .Unlock ()
277
309
continue
278
310
}
279
- if _ , ok := sentEntries [k ]; ok {
311
+ if _ , ok := c .sentEntries [k ]; ok {
312
+ c .mutex .Unlock ()
280
313
continue
281
314
}
282
315
@@ -286,14 +319,16 @@ func (c *client) mainloop(ctx context.Context, params *lookupParams) {
286
319
// Require at least one resolved IP address for ServiceEntry
287
320
// TODO: wait some more time as chances are high both will arrive.
288
321
if len (e .AddrIPv4 ) == 0 && len (e .AddrIPv6 ) == 0 {
322
+ c .mutex .Unlock ()
289
323
continue
290
324
}
291
325
}
292
326
// Submit entry to subscriber and cache it.
293
327
// This is also a point to possibly stop probing actively for a
294
328
// service entry.
329
+ c .sentEntries [k ] = e
330
+ c .mutex .Unlock ()
295
331
params .Entries <- e
296
- sentEntries [k ] = e
297
332
if ! params .isBrowsing {
298
333
params .disableProbing ()
299
334
}
0 commit comments