Skip to content

Commit d72d3de

Browse files
committed
opt: refactor the logic of handling UDP sockets
1 parent 4f2cfa3 commit d72d3de

8 files changed

+47
-36
lines changed

client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func NewClient(eventHandler EventHandler, opts ...Option) (cli *Client, err erro
8585
options.ReadBufferCap = toolkit.CeilToPowerOfTwo(rbc)
8686
}
8787
el.buffer = make([]byte, options.ReadBufferCap)
88-
el.udpSockets = make(map[int]*conn)
88+
el.clientUDPSockets = make(map[int]*conn)
8989
el.connections = make(map[int]*conn)
9090
el.eventHandler = eventHandler
9191
cli.el = el

connection_unix.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ import (
3434

3535
type conn struct {
3636
fd int // file descriptor
37-
sa unix.Sockaddr // remote socket address
3837
ctx interface{} // user-defined context
38+
peer unix.Sockaddr // remote socket address
3939
loop *eventloop // connected event-loop
4040
codec ICodec // codec for TCP
4141
opened bool // connection opened event fired
@@ -50,7 +50,7 @@ type conn struct {
5050
func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr, codec ICodec, localAddr, remoteAddr net.Addr) (c *conn) {
5151
c = &conn{
5252
fd: fd,
53-
sa: sa,
53+
peer: sa,
5454
loop: el,
5555
codec: codec,
5656
localAddr: localAddr,
@@ -65,7 +65,7 @@ func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr, codec ICodec, localAddr
6565

6666
func (c *conn) releaseTCP() {
6767
c.opened = false
68-
c.sa = nil
68+
c.peer = nil
6969
c.ctx = nil
7070
c.localAddr = nil
7171
c.remoteAddr = nil
@@ -79,13 +79,13 @@ func (c *conn) releaseTCP() {
7979
func newUDPConn(fd int, el *eventloop, localAddr net.Addr, sa unix.Sockaddr, connected bool) (c *conn) {
8080
c = &conn{
8181
fd: fd,
82-
sa: sa,
82+
peer: sa,
8383
loop: el,
8484
localAddr: localAddr,
8585
remoteAddr: socket.SockaddrToUDPAddr(sa),
8686
}
8787
if connected {
88-
c.sa = nil
88+
c.peer = nil
8989
}
9090
return
9191
}
@@ -166,10 +166,10 @@ func (c *conn) asyncWrite(itf interface{}) error {
166166
func (c *conn) sendTo(buf []byte) error {
167167
c.loop.eventHandler.PreWrite(c)
168168
defer c.loop.eventHandler.AfterWrite(c, buf)
169-
if c.sa == nil {
169+
if c.peer == nil {
170170
return unix.Send(c.fd, buf, 0)
171171
}
172-
return unix.Sendto(c.fd, buf, 0, c.sa)
172+
return unix.Sendto(c.fd, buf, 0, c.peer)
173173
}
174174

175175
// ================================== Non-concurrency-safe API's ==================================

eventloop_unix.go

+27-20
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,16 @@ import (
3535
)
3636

3737
type eventloop struct {
38-
ln *listener // listener
39-
idx int // loop index in the server loops list
40-
svr *server // server in loop
41-
poller *netpoll.Poller // epoll or kqueue
42-
buffer []byte // read packet buffer whose capacity is set by user, default value is 64KB
43-
connCount int32 // number of active connections in event-loop
44-
udpSockets map[int]*conn // UDP socket map: fd -> conn
45-
connections map[int]*conn // TCP connection map: fd -> conn
46-
eventHandler EventHandler // user eventHandler
38+
ln *listener // listener
39+
idx int // loop index in the server loops list
40+
svr *server // server in loop
41+
poller *netpoll.Poller // epoll or kqueue
42+
buffer []byte // read packet buffer whose capacity is set by user, default value is 64KB
43+
connCount int32 // number of active connections in event-loop
44+
connections map[int]*conn // TCP connection map: fd -> conn
45+
eventHandler EventHandler // user eventHandler
46+
clientUDPSockets map[int]*conn // client-side UDP socket map: fd -> conn
47+
serverUDPSockets map[unix.Sockaddr]*conn // server-side UDP socket map: Sockaddr -> conn
4748
}
4849

4950
func (el *eventloop) getLogger() logging.Logger {
@@ -58,11 +59,17 @@ func (el *eventloop) loadConn() int32 {
5859
return atomic.LoadInt32(&el.connCount)
5960
}
6061

61-
func (el *eventloop) closeAllConns() {
62+
func (el *eventloop) closeAllSockets() {
6263
// Close loops and all outstanding connections
6364
for _, c := range el.connections {
6465
_ = el.loopCloseConn(c, nil)
6566
}
67+
for _, c := range el.clientUDPSockets {
68+
c.releaseUDP()
69+
}
70+
for _, c := range el.serverUDPSockets {
71+
c.releaseUDP()
72+
}
6673
}
6774

6875
func (el *eventloop) loopRegister(itf interface{}) error {
@@ -76,7 +83,7 @@ func (el *eventloop) loopRegister(itf interface{}) error {
7683
c.releaseUDP()
7784
return err
7885
}
79-
el.udpSockets[c.fd] = c
86+
el.clientUDPSockets[c.fd] = c
8087
return nil
8188
}
8289
if err := el.poller.AddRead(c.pollAttachment); err != nil {
@@ -277,11 +284,15 @@ func (el *eventloop) loopReadUDP(fd int) error {
277284
return fmt.Errorf("failed to read UDP packet from fd=%d in event-loop(%d), %v",
278285
fd, el.idx, os.NewSyscallError("recvfrom", err))
279286
}
280-
c := el.udpSockets[fd]
281-
var oneOff bool
282-
if c == nil {
283-
c = newUDPConn(fd, el, el.ln.lnaddr, sa, false)
284-
oneOff = true
287+
var c *conn
288+
if fd == el.ln.fd {
289+
c = el.serverUDPSockets[sa]
290+
if c == nil {
291+
c = newUDPConn(fd, el, el.ln.lnaddr, sa, false)
292+
el.serverUDPSockets[sa] = c
293+
}
294+
} else {
295+
c = el.clientUDPSockets[fd]
285296
}
286297
out, action := el.eventHandler.React(el.buffer[:n], c)
287298
if out != nil {
@@ -290,9 +301,5 @@ func (el *eventloop) loopReadUDP(fd int) error {
290301
if action == Shutdown {
291302
return gerrors.ErrServerShutdown
292303
}
293-
if oneOff {
294-
c.releaseUDP()
295-
}
296-
297304
return nil
298305
}

reactor_default_bsd.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (el *eventloop) activateSubReactor(lockOSThread bool) {
4848
}
4949

5050
defer func() {
51-
el.closeAllConns()
51+
el.closeAllSockets()
5252
el.svr.signalShutdown()
5353
}()
5454

@@ -81,7 +81,7 @@ func (el *eventloop) loopRun(lockOSThread bool) {
8181
}
8282

8383
defer func() {
84-
el.closeAllConns()
84+
el.closeAllSockets()
8585
el.ln.close()
8686
el.svr.signalShutdown()
8787
}()

reactor_default_linux.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (el *eventloop) activateSubReactor(lockOSThread bool) {
4747
}
4848

4949
defer func() {
50-
el.closeAllConns()
50+
el.closeAllSockets()
5151
el.svr.signalShutdown()
5252
}()
5353

@@ -96,7 +96,7 @@ func (el *eventloop) loopRun(lockOSThread bool) {
9696
}
9797

9898
defer func() {
99-
el.closeAllConns()
99+
el.closeAllSockets()
100100
el.ln.close()
101101
el.svr.signalShutdown()
102102
}()

reactor_optimized_bsd.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (el *eventloop) activateSubReactor(lockOSThread bool) {
4747
}
4848

4949
defer func() {
50-
el.closeAllConns()
50+
el.closeAllSockets()
5151
el.svr.signalShutdown()
5252
}()
5353

@@ -66,7 +66,7 @@ func (el *eventloop) loopRun(lockOSThread bool) {
6666
}
6767

6868
defer func() {
69-
el.closeAllConns()
69+
el.closeAllSockets()
7070
el.ln.close()
7171
el.svr.signalShutdown()
7272
}()

reactor_optimized_linux.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (el *eventloop) activateSubReactor(lockOSThread bool) {
4646
}
4747

4848
defer func() {
49-
el.closeAllConns()
49+
el.closeAllSockets()
5050
el.svr.signalShutdown()
5151
}()
5252

@@ -65,7 +65,7 @@ func (el *eventloop) loopRun(lockOSThread bool) {
6565
}
6666

6767
defer func() {
68-
el.closeAllConns()
68+
el.closeAllSockets()
6969
el.ln.close()
7070
el.svr.signalShutdown()
7171
}()

server_unix.go

+4
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"sync"
2525
"sync/atomic"
2626

27+
"golang.org/x/sys/unix"
28+
2729
"github.com/panjf2000/gnet/internal/netpoll"
2830
"github.com/panjf2000/gnet/pkg/errors"
2931
)
@@ -109,6 +111,7 @@ func (svr *server) activateEventLoops(numEventLoop int) (err error) {
109111
el.svr = svr
110112
el.poller = p
111113
el.buffer = make([]byte, svr.opts.ReadBufferCap)
114+
el.serverUDPSockets = make(map[unix.Sockaddr]*conn)
112115
el.connections = make(map[int]*conn)
113116
el.eventHandler = svr.eventHandler
114117
if err = el.poller.AddRead(el.ln.packPollAttachment(el.loopAccept)); err != nil {
@@ -141,6 +144,7 @@ func (svr *server) activateReactors(numEventLoop int) error {
141144
el.svr = svr
142145
el.poller = p
143146
el.buffer = make([]byte, svr.opts.ReadBufferCap)
147+
el.serverUDPSockets = make(map[unix.Sockaddr]*conn)
144148
el.connections = make(map[int]*conn)
145149
el.eventHandler = svr.eventHandler
146150
svr.lb.register(el)

0 commit comments

Comments
 (0)