Skip to content

Commit b5f9479

Browse files
fix: solve the problem that DNS dialer not perform switching & udp traffic stucks (#782)
Co-authored-by: Markson Hon <[email protected]>
1 parent a015e91 commit b5f9479

File tree

5 files changed

+57
-71
lines changed

5 files changed

+57
-71
lines changed

control/dns_control.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"strconv"
1515
"strings"
1616
"sync"
17+
"sync/atomic"
1718
"time"
1819

1920
"github.com/daeuniverse/dae/common/consts"
@@ -79,7 +80,12 @@ type DnsController struct {
7980
dnsCacheMu sync.Mutex
8081
dnsCache map[string]*DnsCache
8182
dnsForwarderCacheMu sync.Mutex
82-
dnsForwarderCache map[string]DnsForwarder
83+
dnsForwarderCache map[dnsForwarderKey]DnsForwarder
84+
}
85+
86+
type handlingState struct {
87+
mu sync.Mutex
88+
ref uint32
8389
}
8490

8591
func parseIpVersionPreference(prefer int) (uint16, error) {
@@ -117,7 +123,7 @@ func NewDnsController(routing *dns.Dns, option *DnsControllerOption) (c *DnsCont
117123
dnsCacheMu: sync.Mutex{},
118124
dnsCache: make(map[string]*DnsCache),
119125
dnsForwarderCacheMu: sync.Mutex{},
120-
dnsForwarderCache: make(map[string]DnsForwarder),
126+
dnsForwarderCache: make(map[dnsForwarderKey]DnsForwarder),
121127
}, nil
122128
}
123129

@@ -346,6 +352,11 @@ type dialArgument struct {
346352
mptcp bool
347353
}
348354

355+
type dnsForwarderKey struct {
356+
upstream string
357+
dialArgument dialArgument
358+
}
359+
349360
func (c *DnsController) Handle_(dnsMessage *dnsmessage.Msg, req *udpRequest) (err error) {
350361
if c.log.IsLevelEnabled(logrus.TraceLevel) && len(dnsMessage.Question) > 0 {
351362
q := dnsMessage.Question[0]
@@ -448,11 +459,17 @@ func (c *DnsController) handle_(
448459
}
449460

450461
// No parallel for the same lookup.
451-
_mu, _ := c.handling.LoadOrStore(cacheKey, new(sync.Mutex))
452-
mu := _mu.(*sync.Mutex)
453-
mu.Lock()
454-
defer mu.Unlock()
455-
defer c.handling.Delete(cacheKey)
462+
handlingState_, _ := c.handling.LoadOrStore(cacheKey, new(handlingState))
463+
handlingState := handlingState_.(*handlingState)
464+
atomic.AddUint32(&handlingState.ref, 1)
465+
handlingState.mu.Lock()
466+
defer func() {
467+
handlingState.mu.Unlock()
468+
atomic.AddUint32(&handlingState.ref, ^uint32(0))
469+
if atomic.LoadUint32(&handlingState.ref) == 0 {
470+
c.handling.Delete(cacheKey)
471+
}
472+
}()
456473

457474
if resp := c.LookupDnsRespCache_(dnsMessage, cacheKey, false); resp != nil {
458475
// Send cache to client directly.
@@ -562,14 +579,14 @@ func (c *DnsController) dialSend(invokingDepth int, req *udpRequest, data []byte
562579

563580
// get forwarder from cache
564581
c.dnsForwarderCacheMu.Lock()
565-
forwarder, ok := c.dnsForwarderCache[upstreamName]
582+
forwarder, ok := c.dnsForwarderCache[dnsForwarderKey{upstream: upstream.String(), dialArgument: *dialArgument}]
566583
if !ok {
567584
forwarder, err = newDnsForwarder(upstream, *dialArgument)
568585
if err != nil {
569586
c.dnsForwarderCacheMu.Unlock()
570587
return err
571588
}
572-
c.dnsForwarderCache[upstreamName] = forwarder
589+
c.dnsForwarderCache[dnsForwarderKey{upstream: upstream.String(), dialArgument: *dialArgument}] = forwarder
573590
}
574591
c.dnsForwarderCacheMu.Unlock()
575592

control/udp_task_pool.go

Lines changed: 25 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
package control
22

33
import (
4+
"context"
45
"sync"
5-
"sync/atomic"
66
"time"
7-
8-
ants "github.com/panjf2000/ants/v2"
97
)
108

11-
var isTest = false
12-
139
const UdpTaskQueueLength = 128
1410

1511
type UdpTask = func()
@@ -21,35 +17,19 @@ type UdpTaskQueue struct {
2117
ch chan UdpTask
2218
timer *time.Timer
2319
agingTime time.Duration
24-
closed atomic.Bool
25-
freed chan struct{}
26-
}
27-
28-
func (q *UdpTaskQueue) Push(task UdpTask) {
29-
q.timer.Reset(q.agingTime)
30-
q.ch <- task
20+
ctx context.Context
21+
closed chan struct{}
3122
}
3223

3324
func (q *UdpTaskQueue) convoy() {
3425
for {
35-
if q.closed.Load() {
36-
clearloop:
37-
for {
38-
select {
39-
case t := <-q.ch:
40-
// Emit it back due to closed q.
41-
ReemitWorkers.Submit(func() {
42-
q.p.EmitTask(q.key, t)
43-
})
44-
default:
45-
break clearloop
46-
}
47-
}
48-
close(q.freed)
26+
select {
27+
case <-q.ctx.Done():
28+
close(q.closed)
4929
return
50-
} else {
51-
t := <-q.ch
52-
t()
30+
case task := <-q.ch:
31+
task()
32+
q.timer.Reset(q.agingTime)
5333
}
5434
}
5535
}
@@ -78,49 +58,39 @@ func (p *UdpTaskPool) EmitTask(key string, task UdpTask) {
7858
q, ok := p.m[key]
7959
if !ok {
8060
ch := p.queueChPool.Get().(chan UdpTask)
61+
ctx, cancel := context.WithCancel(context.Background())
8162
q = &UdpTaskQueue{
8263
key: key,
8364
p: p,
8465
ch: ch,
8566
timer: nil,
8667
agingTime: DefaultNatTimeout,
87-
closed: atomic.Bool{},
88-
freed: make(chan struct{}),
68+
ctx: ctx,
69+
closed: make(chan struct{}),
8970
}
9071
q.timer = time.AfterFunc(q.agingTime, func() {
91-
// This func may be invoked twice due to concurrent Reset.
92-
if !q.closed.CompareAndSwap(false, true) {
93-
return
94-
}
95-
if isTest {
96-
time.Sleep(3 * time.Microsecond)
97-
}
72+
// if timer executed, there should no task in queue.
73+
// q.closed should not blocking things.
9874
p.mu.Lock()
99-
defer p.mu.Unlock()
100-
if p.m[key] == q {
101-
delete(p.m, key)
75+
cancel()
76+
delete(p.m, key)
77+
p.mu.Unlock()
78+
<-q.closed
79+
if len(ch) == 0 { // Otherwise let it be GCed
80+
p.queueChPool.Put(ch)
10281
}
103-
// Trigger next loop in func convoy
104-
q.ch <- func() {}
105-
<-q.freed
106-
p.queueChPool.Put(ch)
10782
})
10883
p.m[key] = q
10984
go q.convoy()
11085
}
11186
p.mu.Unlock()
112-
q.Push(task)
87+
// if task cannot be executed within 180s(DefaultNatTimeout), GC may be triggered, so skip the task when GC occurs
88+
select {
89+
case q.ch <- task:
90+
case <-q.ctx.Done():
91+
}
11392
}
11493

11594
var (
11695
DefaultUdpTaskPool = NewUdpTaskPool()
117-
ReemitWorkers *ants.Pool
11896
)
119-
120-
func init() {
121-
var err error
122-
ReemitWorkers, err = ants.NewPool(UdpTaskQueueLength/2, ants.WithExpiryDuration(AnyfromTimeout))
123-
if err != nil {
124-
panic(err)
125-
}
126-
}

control/udp_task_pool_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,20 @@ import (
1313
"github.com/stretchr/testify/require"
1414
)
1515

16+
// Should run successfully in less than 3.2 seconds.
1617
func TestUdpTaskPool(t *testing.T) {
17-
isTest = true
1818
c, err := cpu.Times(false)
1919
require.NoError(t, err)
2020
t.Log(c)
2121
DefaultNatTimeout = 1000 * time.Microsecond
2222
for i := 0; i < 100; i++ {
23-
DefaultUdpTaskPool.EmitTask("testkey", func() {
24-
})
23+
DefaultUdpTaskPool.EmitTask("testkey", func() { time.Sleep(100 * time.Microsecond) })
2524
time.Sleep(99 * time.Microsecond)
2625
}
27-
time.Sleep(5 * time.Second)
26+
time.Sleep(1 * time.Second)
27+
DefaultUdpTaskPool.EmitTask("testkey", func() { time.Sleep(100 * time.Second) })
28+
time.Sleep(2 * time.Second)
29+
DefaultUdpTaskPool.EmitTask("testkey", func() { time.Sleep(100 * time.Second) })
2830
c, err = cpu.Times(false)
2931
require.NoError(t, err)
3032
t.Log(c)

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ require (
1818
github.com/miekg/dns v1.1.61
1919
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
2020
github.com/okzk/sdnotify v0.0.0-20180710141335-d9becc38acbd
21-
github.com/panjf2000/ants/v2 v2.0.0
2221
github.com/safchain/ethtool v0.4.1
2322
github.com/shirou/gopsutil/v4 v4.24.6
2423
github.com/sirupsen/logrus v1.9.3

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,6 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
150150
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
151151
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
152152
github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
153-
github.com/panjf2000/ants/v2 v2.0.0 h1:MvUd+EfTcLl9l8Mh6nQkMQaE4cLAewd3bv97ajOyldQ=
154-
github.com/panjf2000/ants/v2 v2.0.0/go.mod h1:1GFm8bV8nyCQvU5K4WvBCTG1/YBFOD2VzjffD8fV55A=
155153
github.com/pierrec/lz4/v4 v4.1.2/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
156154
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
157155
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=

0 commit comments

Comments
 (0)