Skip to content

Commit 8376b37

Browse files
committed
feat: refactoring udp_task_pool to avoid task channel not being fully processed yet during gc
1 parent 246e59d commit 8376b37

File tree

4 files changed

+24
-67
lines changed

4 files changed

+24
-67
lines changed

control/udp_task_pool.go

Lines changed: 18 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
package control
22

33
import (
4+
"context"
45
"sync"
5-
"sync/atomic"
66
"time"
7-
8-
ants "github.com/panjf2000/ants/v2"
97
)
108

11-
var isTest = false
12-
139
const UdpTaskQueueLength = 128
1410

1511
type UdpTask = func()
@@ -21,35 +17,19 @@ type UdpTaskQueue struct {
2117
ch chan UdpTask
2218
timer *time.Timer
2319
agingTime time.Duration
24-
closed atomic.Bool
25-
freed chan struct{}
26-
}
27-
28-
func (q *UdpTaskQueue) Push(task UdpTask) {
29-
q.timer.Reset(q.agingTime)
30-
q.ch <- task
20+
ctx context.Context
21+
closed chan struct{}
3122
}
3223

3324
func (q *UdpTaskQueue) convoy() {
3425
for {
35-
if q.closed.Load() {
36-
clearloop:
37-
for {
38-
select {
39-
case t := <-q.ch:
40-
// Emit it back due to closed q.
41-
ReemitWorkers.Submit(func() {
42-
q.p.EmitTask(q.key, t)
43-
})
44-
default:
45-
break clearloop
46-
}
47-
}
48-
close(q.freed)
26+
select {
27+
case <-q.ctx.Done():
28+
close(q.closed)
4929
return
50-
} else {
51-
t := <-q.ch
52-
t()
30+
case task := <-q.ch:
31+
task()
32+
q.timer.Reset(q.agingTime)
5333
}
5434
}
5535
}
@@ -78,49 +58,33 @@ func (p *UdpTaskPool) EmitTask(key string, task UdpTask) {
7858
q, ok := p.m[key]
7959
if !ok {
8060
ch := p.queueChPool.Get().(chan UdpTask)
61+
ctx, cancel := context.WithCancel(context.Background())
8162
q = &UdpTaskQueue{
8263
key: key,
8364
p: p,
8465
ch: ch,
8566
timer: nil,
8667
agingTime: DefaultNatTimeout,
87-
closed: atomic.Bool{},
88-
freed: make(chan struct{}),
68+
ctx: ctx,
69+
closed: make(chan struct{}),
8970
}
9071
q.timer = time.AfterFunc(q.agingTime, func() {
91-
// This func may be invoked twice due to concurrent Reset.
92-
if !q.closed.CompareAndSwap(false, true) {
93-
return
94-
}
95-
if isTest {
96-
time.Sleep(3 * time.Microsecond)
97-
}
72+
// if timer executed, there should no task in queue.
73+
// q.closed should not blocking things.
9874
p.mu.Lock()
99-
if p.m[key] == q {
100-
delete(p.m, key)
101-
}
75+
cancel()
76+
delete(p.m, key)
10277
p.mu.Unlock()
103-
// Trigger next loop in func convoy
104-
q.ch <- func() {}
105-
<-q.freed
78+
<-q.closed
10679
p.queueChPool.Put(ch)
10780
})
10881
p.m[key] = q
10982
go q.convoy()
11083
}
11184
p.mu.Unlock()
112-
q.Push(task)
85+
q.ch <- task
11386
}
11487

11588
var (
11689
DefaultUdpTaskPool = NewUdpTaskPool()
117-
ReemitWorkers *ants.Pool
11890
)
119-
120-
func init() {
121-
var err error
122-
ReemitWorkers, err = ants.NewPool(UdpTaskQueueLength/2, ants.WithExpiryDuration(AnyfromTimeout))
123-
if err != nil {
124-
panic(err)
125-
}
126-
}

control/udp_task_pool_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,20 @@ import (
1313
"github.com/stretchr/testify/require"
1414
)
1515

16+
// Should run successfully in less than 3.2 seconds.
1617
func TestUdpTaskPool(t *testing.T) {
17-
isTest = true
1818
c, err := cpu.Times(false)
1919
require.NoError(t, err)
2020
t.Log(c)
2121
DefaultNatTimeout = 1000 * time.Microsecond
2222
for i := 0; i < 100; i++ {
23-
DefaultUdpTaskPool.EmitTask("testkey", func() {
24-
})
23+
DefaultUdpTaskPool.EmitTask("testkey", func() { time.Sleep(100 * time.Microsecond) })
2524
time.Sleep(99 * time.Microsecond)
2625
}
27-
time.Sleep(5 * time.Second)
26+
time.Sleep(1 * time.Second)
27+
DefaultUdpTaskPool.EmitTask("testkey", func() { time.Sleep(100 * time.Second) })
28+
time.Sleep(2 * time.Second)
29+
DefaultUdpTaskPool.EmitTask("testkey", func() { time.Sleep(100 * time.Second) })
2830
c, err = cpu.Times(false)
2931
require.NoError(t, err)
3032
t.Log(c)

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ require (
1818
github.com/miekg/dns v1.1.61
1919
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
2020
github.com/okzk/sdnotify v0.0.0-20180710141335-d9becc38acbd
21-
github.com/panjf2000/ants/v2 v2.0.0
2221
github.com/safchain/ethtool v0.4.1
2322
github.com/shirou/gopsutil/v4 v4.24.6
2423
github.com/sirupsen/logrus v1.9.3

go.sum

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,6 @@ github.com/cloudflare/circl v1.3.9/go.mod h1:PDRU+oXvdD7KCtgKxW95M5Z8BpSCJXQORiZ
2626
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
2727
github.com/daeuniverse/dae-config-dist/go/dae_config v0.0.0-20230604120805-1c27619b592d h1:hnC39MjR7xt5kZjrKlef7DXKFDkiX8MIcDXYC/6Jf9Q=
2828
github.com/daeuniverse/dae-config-dist/go/dae_config v0.0.0-20230604120805-1c27619b592d/go.mod h1:VGWGgv7pCP5WGyHGUyb9+nq/gW0yBm+i/GfCNATOJ1M=
29-
github.com/daeuniverse/outbound v0.0.0-20250217131751-6b9008a8de11 h1:/YgVshQydYcTDNp/8N++0t4fNVLxGqF5UWGJA3lo/ZU=
30-
github.com/daeuniverse/outbound v0.0.0-20250217131751-6b9008a8de11/go.mod h1:fywFXIIfFeyG+oMat6h7MExY99CNtERbhrH0DYSr/6g=
31-
github.com/daeuniverse/outbound v0.0.0-20250217155837-ed1630717a5c h1:jlx7EJs/YuCmx1WfKCPh3QoPPKqqSjf4Zbj2wBuR20o=
32-
github.com/daeuniverse/outbound v0.0.0-20250217155837-ed1630717a5c/go.mod h1:fywFXIIfFeyG+oMat6h7MExY99CNtERbhrH0DYSr/6g=
33-
github.com/daeuniverse/outbound v0.0.0-20250218165550-ad1a2c485f36 h1:qfjdvxdbxCTVfqCe68cMC7QMXJniil5ZNAxs0TrIj0w=
34-
github.com/daeuniverse/outbound v0.0.0-20250218165550-ad1a2c485f36/go.mod h1:fywFXIIfFeyG+oMat6h7MExY99CNtERbhrH0DYSr/6g=
3529
github.com/daeuniverse/outbound v0.0.0-20250219135309-c607702d1c85 h1:g+V4WLWTZLXRCHmjgXH8W9kgYf+6QBPOS4q7plcHMFk=
3630
github.com/daeuniverse/outbound v0.0.0-20250219135309-c607702d1c85/go.mod h1:fywFXIIfFeyG+oMat6h7MExY99CNtERbhrH0DYSr/6g=
3731
github.com/daeuniverse/quic-go v0.0.0-20250210145620-2083199a7851 h1:AK4qfFw5CcHdOJcEpZj443NqskjhTvc+2cLOB5Cvrmk=
@@ -154,8 +148,6 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
154148
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
155149
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
156150
github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
157-
github.com/panjf2000/ants/v2 v2.0.0 h1:MvUd+EfTcLl9l8Mh6nQkMQaE4cLAewd3bv97ajOyldQ=
158-
github.com/panjf2000/ants/v2 v2.0.0/go.mod h1:1GFm8bV8nyCQvU5K4WvBCTG1/YBFOD2VzjffD8fV55A=
159151
github.com/pierrec/lz4/v4 v4.1.2/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
160152
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
161153
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=

0 commit comments

Comments
 (0)