Skip to content

Commit 3594d22

Browse files
authored
opt: reduce duplicate code of I/O processing (#587)
1 parent 39c175b commit 3594d22

10 files changed

+51
-202
lines changed

acceptor_unix.go

+2-13
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (el *eventloop) accept0(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error
3333
nfd, sa, err := socket.Accept(fd)
3434
if err != nil {
3535
switch err {
36-
case unix.EAGAIN: // the Accept queue has been drained, we can return now
36+
case unix.EAGAIN: // the Accept queue has been drained out, we can return now
3737
return nil
3838
case unix.EINTR, unix.ECONNRESET, unix.ECONNABORTED:
3939
// ECONNRESET or ECONNABORTED could indicate that a socket
@@ -93,16 +93,5 @@ func (el *eventloop) accept(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) e
9393
}
9494

9595
c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr)
96-
addEvents := el.poller.AddRead
97-
if el.engine.opts.EdgeTriggeredIO {
98-
addEvents = el.poller.AddReadWrite
99-
}
100-
if err = addEvents(&c.pollAttachment, el.engine.opts.EdgeTriggeredIO); err != nil {
101-
el.getLogger().Errorf("failed to register the accepted socket fd=%d to poller: %v", c.fd, err)
102-
_ = unix.Close(c.fd)
103-
c.release()
104-
return err
105-
}
106-
el.connections.addConn(c, el.idx)
107-
return el.open(c)
96+
return el.register0(c)
10897
}

connection_bsd.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,14 @@ func (c *conn) processIO(_ int, filter netpoll.IOEvent, flags netpoll.IOFlags) (
3939
if flags&unix.EV_EOF != 0 && c.opened && err == nil {
4040
switch filter {
4141
case unix.EVFILT_READ:
42-
// Receive the event of EVFILT_READ | EV_EOF, but the previous eventloop.read
42+
// Received the event of EVFILT_READ|EV_EOF, but the previous eventloop.read
4343
// failed to drain the socket buffer, so we make sure we get it done this time.
4444
c.isEOF = true
4545
err = el.read(c)
4646
case unix.EVFILT_WRITE:
47-
// On macOS, the kqueue in both LT and ET mode will notify with one event for the EOF
48-
// of the TCP remote: EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. But for some reason, two
49-
// events will be issued in ET mode for the EOF of the Unix remote in this order:
47+
// On macOS, the kqueue in either LT or ET mode will notify with one event for the
48+
// EOF of the TCP remote: EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. But for some reason,
49+
// two events will be issued in ET mode for the EOF of the Unix remote in this order:
5050
// 1) EVFILT_WRITE|EV_ADD|EV_CLEAR|EV_EOF, 2) EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF.
5151
err = el.write(c)
5252
default:

connection_linux.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828
func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
2929
el := c.loop
3030
// First check for any unexpected non-IO events.
31-
// For these events we just close the corresponding connection directly.
31+
// For these events we just close the connection directly.
3232
if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 {
3333
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
3434
return el.close(c, io.EOF)
@@ -40,9 +40,9 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
4040
// offload the incoming traffic by writing all pending data back to the remotes
4141
// before continuing to read and handle requests.
4242
// 2. When the connection is dead, we need to try writing any pending data back
43-
// to the remote and close the connection first.
43+
// to the remote first and then close the connection.
4444
//
45-
// We perform eventloop.write for EPOLLOUT because it will take good care of either case.
45+
// We perform eventloop.write for EPOLLOUT because it can take good care of either case.
4646
if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 {
4747
if err := el.write(c); err != nil {
4848
return err
@@ -61,8 +61,8 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
6161
if ev&unix.EPOLLIN == 0 { // unreadable EPOLLRDHUP, close the connection directly
6262
return el.close(c, io.EOF)
6363
}
64-
// Received the event of EPOLLIN | EPOLLRDHUP, but the previous eventloop.read
65-
// failed to drain the socket buffer, so we make sure we get it done this time.
64+
// Received the event of EPOLLIN|EPOLLRDHUP, but the previous eventloop.read
65+
// failed to drain the socket buffer, so we ensure to get it done this time.
6666
c.isEOF = true
6767
return el.read(c)
6868
}

eventloop_unix.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -75,20 +75,20 @@ func (el *eventloop) register(itf interface{}) error {
7575
c = ccb.c
7676
defer ccb.cb()
7777
}
78+
return el.register0(c)
79+
}
7880

81+
func (el *eventloop) register0(c *conn) error {
7982
addEvents := el.poller.AddRead
8083
if el.engine.opts.EdgeTriggeredIO {
8184
addEvents = el.poller.AddReadWrite
8285
}
83-
8486
if err := addEvents(&c.pollAttachment, el.engine.opts.EdgeTriggeredIO); err != nil {
8587
_ = unix.Close(c.fd)
8688
c.release()
8789
return err
8890
}
89-
9091
el.connections.addConn(c, el.idx)
91-
9292
if c.isDatagram && c.remote != nil {
9393
return nil
9494
}

eventloop_windows.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@ package gnet
1717
import (
1818
"bytes"
1919
"context"
20+
"errors"
2021
"runtime"
2122
"strings"
2223
"sync/atomic"
2324
"time"
2425

25-
"github.com/panjf2000/gnet/v2/pkg/errors"
26+
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
2627
"github.com/panjf2000/gnet/v2/pkg/logging"
2728
)
2829

@@ -79,7 +80,7 @@ func (el *eventloop) run() (err error) {
7980
err = v()
8081
}
8182

82-
if err == errors.ErrEngineShutdown {
83+
if errors.Is(err, errorx.ErrEngineShutdown) {
8384
el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err)
8485
break
8586
} else if err != nil {
@@ -121,7 +122,7 @@ func (el *eventloop) read(c *conn) error {
121122
case Close:
122123
return el.close(c, nil)
123124
case Shutdown:
124-
return errors.ErrEngineShutdown
125+
return errorx.ErrEngineShutdown
125126
}
126127
_, _ = c.inboundBuffer.Write(c.buffer.B)
127128
c.buffer.Reset()
@@ -132,7 +133,7 @@ func (el *eventloop) read(c *conn) error {
132133
func (el *eventloop) readUDP(c *conn) error {
133134
action := el.eventHandler.OnTraffic(c)
134135
if action == Shutdown {
135-
return errors.ErrEngineShutdown
136+
return errorx.ErrEngineShutdown
136137
}
137138
c.release()
138139
return nil
@@ -160,7 +161,7 @@ func (el *eventloop) ticker(ctx context.Context) {
160161
case Shutdown:
161162
if !shutdown {
162163
shutdown = true
163-
el.ch <- errors.ErrEngineShutdown
164+
el.ch <- errorx.ErrEngineShutdown
164165
el.getLogger().Debugf("stopping ticker in event-loop(%d) from Tick()", el.idx)
165166
}
166167
}
@@ -220,7 +221,7 @@ func (el *eventloop) handleAction(c *conn, action Action) error {
220221
case Close:
221222
return el.close(c, nil)
222223
case Shutdown:
223-
return errors.ErrEngineShutdown
224+
return errorx.ErrEngineShutdown
224225
default:
225226
return nil
226227
}

gnet_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1484,7 +1484,7 @@ func (s *simServer) OnTraffic(c Conn) (action Action) {
14841484
var packets [][]byte
14851485
for {
14861486
data, err := codec.Decode(c)
1487-
if err == errIncompletePacket {
1487+
if errors.Is(err, errIncompletePacket) {
14881488
break
14891489
}
14901490
if err != nil {

reactor_epoll_default.go

+8-93
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@
1818
package gnet
1919

2020
import (
21-
"io"
21+
"errors"
2222
"runtime"
2323

24-
"golang.org/x/sys/unix"
25-
2624
"github.com/panjf2000/gnet/v2/internal/netpoll"
27-
"github.com/panjf2000/gnet/v2/pkg/errors"
25+
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
2826
)
2927

3028
func (el *eventloop) rotate() error {
@@ -34,7 +32,7 @@ func (el *eventloop) rotate() error {
3432
}
3533

3634
err := el.poller.Polling(el.accept0)
37-
if err == errors.ErrEngineShutdown {
35+
if errors.Is(err, errorx.ErrEngineShutdown) {
3836
el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err)
3937
err = nil
4038
} else if err != nil {
@@ -52,57 +50,16 @@ func (el *eventloop) orbit() error {
5250
defer runtime.UnlockOSThread()
5351
}
5452

55-
err := el.poller.Polling(func(fd int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
53+
err := el.poller.Polling(func(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
5654
c := el.connections.getConn(fd)
5755
if c == nil {
5856
// Somehow epoll notified with an event for a stale fd that is not in our connection set.
5957
// We need to delete it from the epoll set.
6058
return el.poller.Delete(fd)
6159
}
62-
63-
// First check for any unexpected non-IO events.
64-
// For these events we just close the corresponding connection directly.
65-
if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 {
66-
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
67-
return el.close(c, io.EOF)
68-
}
69-
// Secondly, check for EPOLLOUT before EPOLLIN, the former has a higher priority
70-
// than the latter regardless of the aliveness of the current connection:
71-
//
72-
// 1. When the connection is alive and the system is overloaded, we want to
73-
// offload the incoming traffic by writing all pending data back to the remotes
74-
// before continuing to read and handle requests.
75-
// 2. When the connection is dead, we need to try writing any pending data back
76-
// to the remote and close the connection first.
77-
//
78-
// We perform eventloop.write for EPOLLOUT because it will take good care of either case.
79-
if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 {
80-
if err := el.write(c); err != nil {
81-
return err
82-
}
83-
}
84-
// Check for EPOLLIN before EPOLLRDHUP in case that there are pending data in
85-
// the socket buffer.
86-
if ev&(unix.EPOLLIN|unix.EPOLLERR) != 0 {
87-
if err := el.read(c); err != nil {
88-
return err
89-
}
90-
}
91-
// Ultimately, check for EPOLLRDHUP, this event indicates that the remote has
92-
// either closed connection or shut down the writing half of the connection.
93-
if ev&unix.EPOLLRDHUP != 0 && c.opened {
94-
if ev&unix.EPOLLIN == 0 { // unreadable EPOLLRDHUP, close the connection directly
95-
return el.close(c, io.EOF)
96-
}
97-
// Received the event of EPOLLIN | EPOLLRDHUP, but the previous eventloop.read
98-
// failed to drain the socket buffer, so we make sure we get it done this time.
99-
c.isEOF = true
100-
return el.read(c)
101-
}
102-
return nil
60+
return c.processIO(fd, ev, flags)
10361
})
104-
105-
if err == errors.ErrEngineShutdown {
62+
if errors.Is(err, errorx.ErrEngineShutdown) {
10663
el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err)
10764
err = nil
10865
} else if err != nil {
@@ -130,52 +87,10 @@ func (el *eventloop) run() error {
13087
// Somehow epoll notified with an event for a stale fd that is not in our connection set.
13188
// We need to delete it from the epoll set.
13289
return el.poller.Delete(fd)
133-
134-
}
135-
136-
// First check for any unexpected non-IO events.
137-
// For these events we just close the corresponding connection directly.
138-
if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 {
139-
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
140-
return el.close(c, io.EOF)
141-
}
142-
// Secondly, check for EPOLLOUT before EPOLLIN, the former has a higher priority
143-
// than the latter regardless of the aliveness of the current connection:
144-
//
145-
// 1. When the connection is alive and the system is overloaded, we want to
146-
// offload the incoming traffic by writing all pending data back to the remotes
147-
// before continuing to read and handle requests.
148-
// 2. When the connection is dead, we need to try writing any pending data back
149-
// to the remote and close the connection first.
150-
//
151-
// We perform eventloop.write for EPOLLOUT because it will take good care of either case.
152-
if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 {
153-
if err := el.write(c); err != nil {
154-
return err
155-
}
15690
}
157-
// Check for EPOLLIN before EPOLLRDHUP in case that there are pending data in
158-
// the socket buffer.
159-
if ev&(unix.EPOLLIN|unix.EPOLLERR) != 0 {
160-
if err := el.read(c); err != nil {
161-
return err
162-
}
163-
}
164-
// Ultimately, check for EPOLLRDHUP, this event indicates that the remote has
165-
// either closed connection or shut down the writing half of the connection.
166-
if ev&unix.EPOLLRDHUP != 0 && c.opened {
167-
if ev&unix.EPOLLIN == 0 { // unreadable EPOLLRDHUP, close the connection directly
168-
return el.close(c, io.EOF)
169-
}
170-
// Received the event of EPOLLIN | EPOLLRDHUP, but the previous eventloop.read
171-
// failed to drain the socket buffer, so we make sure we get it done this time.
172-
c.isEOF = true
173-
return el.read(c)
174-
}
175-
return nil
91+
return c.processIO(fd, ev, flags)
17692
})
177-
178-
if err == errors.ErrEngineShutdown {
93+
if errors.Is(err, errorx.ErrEngineShutdown) {
17994
el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err)
18095
err = nil
18196
} else if err != nil {

reactor_epoll_ultimate.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
package gnet
1919

2020
import (
21+
"errors"
2122
"runtime"
2223

23-
"github.com/panjf2000/gnet/v2/pkg/errors"
24+
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
2425
)
2526

2627
func (el *eventloop) rotate() error {
@@ -30,7 +31,7 @@ func (el *eventloop) rotate() error {
3031
}
3132

3233
err := el.poller.Polling()
33-
if err == errors.ErrEngineShutdown {
34+
if errors.Is(err, errorx.ErrEngineShutdown) {
3435
el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err)
3536
err = nil
3637
} else if err != nil {
@@ -49,7 +50,7 @@ func (el *eventloop) orbit() error {
4950
}
5051

5152
err := el.poller.Polling()
52-
if err == errors.ErrEngineShutdown {
53+
if errors.Is(err, errorx.ErrEngineShutdown) {
5354
el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err)
5455
err = nil
5556
} else if err != nil {
@@ -69,7 +70,7 @@ func (el *eventloop) run() error {
6970
}
7071

7172
err := el.poller.Polling()
72-
if err == errors.ErrEngineShutdown {
73+
if errors.Is(err, errorx.ErrEngineShutdown) {
7374
el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err)
7475
err = nil
7576
} else if err != nil {

0 commit comments

Comments
 (0)