Skip to content

Commit 7159b95

Browse files
committed
feat: support UDP and Unix protocol in client
1 parent 5d8fe64 commit 7159b95

17 files changed

+459
-53
lines changed

.github/workflows/ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ jobs:
6363
run: go test $(go list ./... | tail -n +2)
6464

6565
- name: Run integration testing
66-
run: go test -v -race -coverprofile="coverage.report" -covermode=atomic -timeout 60s
66+
run: go test -v -race -coverprofile="coverage.report" -covermode=atomic -timeout 120s
6767

6868
- name: Upload the code coverage report to codecov.io
6969
uses: codecov/codecov-action@v2

.github/workflows/ci_poll_opt.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,4 @@ jobs:
6060
${{ runner.os }}-${{ matrix.go }}-go-ci
6161
6262
- name: Run integration testing with poll_opt build tag
63-
run: go test -v -tags=poll_opt -timeout 60s
63+
run: go test -v -tags=poll_opt -timeout 120s

acceptor_unix.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ import (
2929
"github.com/panjf2000/gnet/pkg/logging"
3030
)
3131

32-
func (svr *server) acceptNewConnection(_ netpoll.IOEvent) error {
33-
nfd, sa, err := unix.Accept(svr.ln.fd)
32+
func (svr *server) acceptNewConnection(fd int, _ netpoll.IOEvent) error {
33+
nfd, sa, err := unix.Accept(fd)
3434
if err != nil {
3535
if err == unix.EAGAIN {
3636
return nil
@@ -42,14 +42,14 @@ func (svr *server) acceptNewConnection(_ netpoll.IOEvent) error {
4242
return err
4343
}
4444

45-
netAddr := socket.SockaddrToTCPOrUnixAddr(sa)
45+
remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
4646
if svr.opts.TCPKeepAlive > 0 && svr.ln.network == "tcp" {
4747
err = socket.SetKeepAlive(nfd, int(svr.opts.TCPKeepAlive/time.Second))
4848
logging.Error(err)
4949
}
5050

51-
el := svr.lb.next(netAddr)
52-
c := newTCPConn(nfd, el, sa, svr.opts.Codec, el.ln.lnaddr, netAddr)
51+
el := svr.lb.next(remoteAddr)
52+
c := newTCPConn(nfd, el, sa, svr.opts.Codec, el.ln.lnaddr, remoteAddr)
5353

5454
err = el.poller.UrgentTrigger(el.loopRegister, c)
5555
if err != nil {
@@ -59,9 +59,9 @@ func (svr *server) acceptNewConnection(_ netpoll.IOEvent) error {
5959
return nil
6060
}
6161

62-
func (el *eventloop) loopAccept(_ netpoll.IOEvent) error {
62+
func (el *eventloop) loopAccept(fd int, _ netpoll.IOEvent) error {
6363
if el.ln.network == "udp" {
64-
return el.loopReadUDP(el.ln.fd)
64+
return el.loopReadUDP(fd)
6565
}
6666

6767
nfd, sa, err := unix.Accept(el.ln.fd)

client.go

+23-14
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"context"
2222
"errors"
2323
"net"
24+
"strconv"
25+
"strings"
2426
"sync"
2527
"syscall"
2628
"time"
@@ -45,6 +47,7 @@ type Client struct {
4547
func NewClient(eventHandler EventHandler, opts ...Option) (cli *Client, err error) {
4648
options := loadOptions(opts...)
4749
cli = new(Client)
50+
cli.opts = options
4851
var logger logging.Logger
4952
if options.LogPath != "" {
5053
if logger, cli.logFlush, err = logging.CreateLoggerAsLocalFile(options.LogPath, options.LogLevel); err != nil {
@@ -59,7 +62,6 @@ func NewClient(eventHandler EventHandler, opts ...Option) (cli *Client, err erro
5962
if options.Codec == nil {
6063
cli.opts.Codec = new(BuiltInFrameCodec)
6164
}
62-
cli.opts = options
6365
var p *netpoll.Poller
6466
if p, err = netpoll.OpenPoller(); err != nil {
6567
return
@@ -68,7 +70,7 @@ func NewClient(eventHandler EventHandler, opts ...Option) (cli *Client, err erro
6870
svr.opts = options
6971
svr.eventHandler = eventHandler
7072
svr.ln = new(listener)
71-
73+
svr.ln.network = "udp"
7274
svr.cond = sync.NewCond(&sync.Mutex{})
7375
if options.Ticker {
7476
svr.tickerCtx, svr.cancelTicker = context.WithCancel(context.Background())
@@ -83,6 +85,7 @@ func NewClient(eventHandler EventHandler, opts ...Option) (cli *Client, err erro
8385
options.ReadBufferCap = toolkit.CeilToPowerOfTwo(rbc)
8486
}
8587
el.buffer = make([]byte, options.ReadBufferCap)
88+
el.udpSockets = make(map[int]*conn)
8689
el.connections = make(map[int]*conn)
8790
el.eventHandler = eventHandler
8891
cli.el = el
@@ -94,7 +97,7 @@ func (cli *Client) Start() error {
9497
cli.el.eventHandler.OnInitComplete(Server{})
9598
cli.el.svr.wg.Add(1)
9699
go func() {
97-
cli.el.activateSubReactor(cli.opts.LockOSThread)
100+
cli.el.loopRun(cli.opts.LockOSThread)
98101
cli.el.svr.wg.Done()
99102
}()
100103
// Start the ticker.
@@ -149,16 +152,19 @@ func (cli *Client) Dial(network, address string) (Conn, error) {
149152
return nil, e
150153
}
151154

152-
if cli.opts.TCPNoDelay == TCPNoDelay {
153-
if err = socket.SetNoDelay(DupFD, 1); err != nil {
154-
return nil, err
155+
if strings.HasPrefix(network, "tcp") {
156+
if cli.opts.TCPNoDelay == TCPNoDelay {
157+
if err = socket.SetNoDelay(DupFD, 1); err != nil {
158+
return nil, err
159+
}
155160
}
156-
}
157-
if cli.opts.TCPKeepAlive > 0 {
158-
if err = socket.SetKeepAlive(DupFD, int(cli.opts.TCPKeepAlive/time.Second)); err != nil {
159-
return nil, err
161+
if cli.opts.TCPKeepAlive > 0 {
162+
if err = socket.SetKeepAlive(DupFD, int(cli.opts.TCPKeepAlive/time.Second)); err != nil {
163+
return nil, err
164+
}
160165
}
161166
}
167+
162168
if cli.opts.SocketSendBuffer > 0 {
163169
if err = socket.SetSendBuffer(DupFD, cli.opts.SocketSendBuffer); err != nil {
164170
return nil, err
@@ -176,26 +182,29 @@ func (cli *Client) Dial(network, address string) (Conn, error) {
176182
)
177183
switch c.(type) {
178184
case *net.UnixConn:
179-
if sockAddr, _, _, err = socket.GetUnixSockAddr(c.LocalAddr().Network(), c.LocalAddr().String()); err != nil {
185+
if sockAddr, _, _, err = socket.GetUnixSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String()); err != nil {
180186
return nil, err
181187
}
188+
ua := c.LocalAddr().(*net.UnixAddr)
189+
ua.Name = c.RemoteAddr().String() + "." + strconv.Itoa(DupFD)
182190
gc = newTCPConn(DupFD, cli.el, sockAddr, cli.opts.Codec, c.LocalAddr(), c.RemoteAddr())
183191
case *net.TCPConn:
184-
if sockAddr, _, _, _, err = socket.GetTCPSockAddr(c.LocalAddr().Network(), c.LocalAddr().String()); err != nil {
192+
if sockAddr, _, _, _, err = socket.GetTCPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String()); err != nil {
185193
return nil, err
186194
}
187195
gc = newTCPConn(DupFD, cli.el, sockAddr, cli.opts.Codec, c.LocalAddr(), c.RemoteAddr())
188196
case *net.UDPConn:
189-
if sockAddr, _, _, _, err = socket.GetUDPSockAddr(c.LocalAddr().Network(), c.LocalAddr().String()); err != nil {
197+
if sockAddr, _, _, _, err = socket.GetUDPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String()); err != nil {
190198
return nil, err
191199
}
192-
gc = newUDPConn(DupFD, cli.el, c.LocalAddr(), sockAddr)
200+
gc = newUDPConn(DupFD, cli.el, c.LocalAddr(), sockAddr, true)
193201
default:
194202
return nil, gerrors.ErrUnsupportedProtocol
195203
}
196204
err = cli.el.poller.UrgentTrigger(cli.el.loopRegister, gc)
197205
if err != nil {
198206
gc.Close()
207+
return nil, err
199208
}
200209
return gc, nil
201210
}

0 commit comments

Comments
 (0)