Skip to content

Commit f03f0bd

Browse files
authored
opt: clamp the event-loops in ET mode to avaoid starving (#599)
1 parent 4c3b84f commit f03f0bd

7 files changed

+84
-60
lines changed

connection_bsd.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (c *conn) processIO(_ int, filter netpoll.IOEvent, flags netpoll.IOFlags) (
5050
// 1) EVFILT_WRITE|EV_ADD|EV_CLEAR|EV_EOF, 2) EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF.
5151
err = el.write(c)
5252
default:
53-
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
53+
c.outboundBuffer.Release() // don't bother to write to a connection that is already broken
5454
err = el.close(c, io.EOF)
5555
}
5656
}

connection_linux.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
2929
el := c.loop
3030
// First check for any unexpected non-IO events.
3131
// For these events we just close the connection directly.
32-
if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 {
33-
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
32+
if ev&(netpoll.ErrEvents|unix.EPOLLRDHUP) != 0 && ev&netpoll.ReadWriteEvents == 0 {
33+
c.outboundBuffer.Release() // don't bother to write to a connection that is already broken
3434
return el.close(c, io.EOF)
3535
}
3636
// Secondly, check for EPOLLOUT before EPOLLIN, the former has a higher priority
@@ -43,14 +43,14 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
4343
// to the remote first and then close the connection.
4444
//
4545
// We perform eventloop.write for EPOLLOUT because it can take good care of either case.
46-
if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 {
46+
if ev&(netpoll.WriteEvents|netpoll.ErrEvents) != 0 {
4747
if err := el.write(c); err != nil {
4848
return err
4949
}
5050
}
5151
// Check for EPOLLIN before EPOLLRDHUP in case that there are pending data in
5252
// the socket buffer.
53-
if ev&(unix.EPOLLIN|unix.EPOLLERR) != 0 {
53+
if ev&(netpoll.ReadEvents|netpoll.ErrEvents) != 0 {
5454
if err := el.read(c); err != nil {
5555
return err
5656
}

eventloop_unix.go

+36-5
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,18 @@ func (el *eventloop) open(c *conn) error {
114114
return el.handleAction(c, action)
115115
}
116116

117+
func (el *eventloop) read0(itf interface{}) error {
118+
return el.read(itf.(*conn))
119+
}
120+
121+
const maxBytesTransferET = 1 << 22
122+
117123
func (el *eventloop) read(c *conn) error {
118124
if !c.opened {
119125
return nil
120126
}
121127

128+
var recv int
122129
isET := el.engine.opts.EdgeTriggeredIO
123130
loop:
124131
n, err := unix.Read(c.fd, el.buffer)
@@ -131,6 +138,7 @@ loop:
131138
}
132139
return el.close(c, os.NewSyscallError("read", err))
133140
}
141+
recv += n
134142

135143
c.buffer = el.buffer[:n]
136144
action := el.eventHandler.OnTraffic(c)
@@ -144,13 +152,25 @@ loop:
144152
_, _ = c.inboundBuffer.Write(c.buffer)
145153
c.buffer = c.buffer[:0]
146154

147-
if isET || c.isEOF {
155+
if c.isEOF || (isET && recv < maxBytesTransferET) {
148156
goto loop
149157
}
150158

159+
// To prevent infinite reading in ET mode and starving other events,
160+
// we need to set up threshold for the maximum read bytes per connection
161+
// on each event-loop. If the threshold is reached and there are still
162+
// unread data in the socket buffer, we must issue another read event manually.
163+
if isET && n == len(el.buffer) {
164+
return el.poller.Trigger(queue.LowPriority, el.read0, c)
165+
}
166+
151167
return nil
152168
}
153169

170+
func (el *eventloop) write0(itf interface{}) error {
171+
return el.write(itf.(*conn))
172+
}
173+
154174
// The default value of UIO_MAXIOV/IOV_MAX is 1024 on Linux and most BSD-like OSs.
155175
const iovMax = 1024
156176

@@ -161,8 +181,9 @@ func (el *eventloop) write(c *conn) error {
161181

162182
isET := el.engine.opts.EdgeTriggeredIO
163183
var (
164-
n int
165-
err error
184+
n int
185+
sent int
186+
err error
166187
)
167188
loop:
168189
iov, _ := c.outboundBuffer.Peek(-1)
@@ -182,14 +203,24 @@ loop:
182203
default:
183204
return el.close(c, os.NewSyscallError("write", err))
184205
}
185-
if isET && !c.outboundBuffer.IsEmpty() {
206+
sent += n
207+
208+
if isET && !c.outboundBuffer.IsEmpty() && sent < maxBytesTransferET {
186209
goto loop
187210
}
188211

189212
// All data have been sent, it's no need to monitor the writable events for LT mode,
190213
// remove the writable event from poller to help the future event-loops if necessary.
191214
if !isET && c.outboundBuffer.IsEmpty() {
192-
_ = el.poller.ModRead(&c.pollAttachment, false)
215+
return el.poller.ModRead(&c.pollAttachment, false)
216+
}
217+
218+
// To prevent infinite writing in ET mode and starving other events,
219+
// we need to set up threshold for the maximum write bytes per connection
220+
// on each event-loop. If the threshold is reached and there are still
221+
// pending data to write, we must issue another write event manually.
222+
if isET && !c.outboundBuffer.IsEmpty() {
223+
return el.poller.Trigger(queue.HighPriority, el.write0, c)
193224
}
194225

195226
return nil

eventloop_unix_test.go

+15-15
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ var (
4848
func BenchmarkGC4El100k(b *testing.B) {
4949
oldGc := debug.SetGCPercent(-1)
5050

51-
ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 100000)
51+
ts1 := benchServeGC(b, "tcp", ":0", true, 4, 100000)
5252
b.Run("Run-4-eventloop-100000", func(b *testing.B) {
5353
for i := 0; i < b.N; i++ {
5454
runtime.GC()
@@ -62,7 +62,7 @@ func BenchmarkGC4El100k(b *testing.B) {
6262
func BenchmarkGC4El200k(b *testing.B) {
6363
oldGc := debug.SetGCPercent(-1)
6464

65-
ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 200000)
65+
ts1 := benchServeGC(b, "tcp", ":0", true, 4, 200000)
6666
b.Run("Run-4-eventloop-200000", func(b *testing.B) {
6767
for i := 0; i < b.N; i++ {
6868
runtime.GC()
@@ -76,7 +76,7 @@ func BenchmarkGC4El200k(b *testing.B) {
7676
func BenchmarkGC4El500k(b *testing.B) {
7777
oldGc := debug.SetGCPercent(-1)
7878

79-
ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 500000)
79+
ts1 := benchServeGC(b, "tcp", ":0", true, 4, 500000)
8080
b.Run("Run-4-eventloop-500000", func(b *testing.B) {
8181
for i := 0; i < b.N; i++ {
8282
runtime.GC()
@@ -146,73 +146,73 @@ func TestServeGC(t *testing.T) {
146146
if testBigGC {
147147
t.Skipf("Skip when testBigGC=%t", testBigGC)
148148
}
149-
testServeGC(t, "tcp", ":9000", true, true, 1, 10000)
149+
testServeGC(t, "tcp", ":0", true, true, 1, 10000)
150150
})
151151
t.Run("1-loop-100000", func(t *testing.T) {
152152
if !testBigGC {
153153
t.Skipf("Skip when testBigGC=%t", testBigGC)
154154
}
155-
testServeGC(t, "tcp", ":9000", true, true, 1, 100000)
155+
testServeGC(t, "tcp", ":0", true, true, 1, 100000)
156156
})
157157
t.Run("1-loop-1000000", func(t *testing.T) {
158158
if !testBigGC {
159159
t.Skipf("Skip when testBigGC=%t", testBigGC)
160160
}
161-
testServeGC(t, "tcp", ":9000", true, true, 1, 1000000)
161+
testServeGC(t, "tcp", ":0", true, true, 1, 1000000)
162162
})
163163
t.Run("2-loop-10000", func(t *testing.T) {
164164
if testBigGC {
165165
t.Skipf("Skip when testBigGC=%t", testBigGC)
166166
}
167-
testServeGC(t, "tcp", ":9000", true, true, 2, 10000)
167+
testServeGC(t, "tcp", ":0", true, true, 2, 10000)
168168
})
169169
t.Run("2-loop-100000", func(t *testing.T) {
170170
if !testBigGC {
171171
t.Skipf("Skip when testBigGC=%t", testBigGC)
172172
}
173-
testServeGC(t, "tcp", ":9000", true, true, 2, 100000)
173+
testServeGC(t, "tcp", ":0", true, true, 2, 100000)
174174
})
175175
t.Run("2-loop-1000000", func(t *testing.T) {
176176
if !testBigGC {
177177
t.Skipf("Skip when testBigGC=%t", testBigGC)
178178
}
179-
testServeGC(t, "tcp", ":9000", true, true, 2, 1000000)
179+
testServeGC(t, "tcp", ":0", true, true, 2, 1000000)
180180
})
181181
t.Run("4-loop-10000", func(t *testing.T) {
182182
if testBigGC {
183183
t.Skipf("Skip when testBigGC=%t", testBigGC)
184184
}
185-
testServeGC(t, "tcp", ":9000", true, true, 4, 10000)
185+
testServeGC(t, "tcp", ":0", true, true, 4, 10000)
186186
})
187187
t.Run("4-loop-100000", func(t *testing.T) {
188188
if !testBigGC {
189189
t.Skipf("Skip when testBigGC=%t", testBigGC)
190190
}
191-
testServeGC(t, "tcp", ":9000", true, true, 4, 100000)
191+
testServeGC(t, "tcp", ":0", true, true, 4, 100000)
192192
})
193193
t.Run("4-loop-1000000", func(t *testing.T) {
194194
if !testBigGC {
195195
t.Skipf("Skip when testBigGC=%t", testBigGC)
196196
}
197-
testServeGC(t, "tcp", ":9000", true, true, 4, 1000000)
197+
testServeGC(t, "tcp", ":0", true, true, 4, 1000000)
198198
})
199199
t.Run("16-loop-10000", func(t *testing.T) {
200200
if testBigGC {
201201
t.Skipf("Skip when testBigGC=%t", testBigGC)
202202
}
203-
testServeGC(t, "tcp", ":9000", true, true, 16, 10000)
203+
testServeGC(t, "tcp", ":0", true, true, 16, 10000)
204204
})
205205
t.Run("16-loop-100000", func(t *testing.T) {
206206
if !testBigGC {
207207
t.Skipf("Skip when testBigGC=%t", testBigGC)
208208
}
209-
testServeGC(t, "tcp", ":9000", true, true, 16, 100000)
209+
testServeGC(t, "tcp", ":0", true, true, 16, 100000)
210210
})
211211
t.Run("16-loop-1000000", func(t *testing.T) {
212212
if !testBigGC {
213213
t.Skipf("Skip when testBigGC=%t", testBigGC)
214214
}
215-
testServeGC(t, "tcp", ":9000", true, true, 16, 1000000)
215+
testServeGC(t, "tcp", ":0", true, true, 16, 1000000)
216216
})
217217
})
218218
}

internal/netpoll/defs_poller_epoll.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,14 @@ const (
3434
MinPollEventsCap = 32
3535
// MaxAsyncTasksAtOneTime is the maximum amount of asynchronous tasks that the event-loop will process at one time.
3636
MaxAsyncTasksAtOneTime = 256
37-
// ErrEvents represents exceptional events that are not read/write, like socket being closed,
38-
// reading/writing from/to a closed socket, etc.
39-
ErrEvents = unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP
37+
// ReadEvents represents readable events that are polled by epoll.
38+
ReadEvents = unix.EPOLLIN | unix.EPOLLPRI
39+
// WriteEvents represents writeable events that are polled by epoll.
40+
WriteEvents = unix.EPOLLOUT
41+
// ReadWriteEvents represents both readable and writeable events.
42+
ReadWriteEvents = ReadEvents | WriteEvents
43+
// ErrEvents represents exceptional events that occurred on the local side.
44+
ErrEvents = unix.EPOLLERR | unix.EPOLLHUP
4045
)
4146

4247
type eventList struct {

internal/netpoll/poller_epoll_default.go

+10-16
Original file line numberDiff line numberDiff line change
@@ -193,57 +193,51 @@ func (p *Poller) Polling(callback PollEventHandler) error {
193193
}
194194
}
195195

196-
const (
197-
readEvents = unix.EPOLLIN | unix.EPOLLPRI | unix.EPOLLRDHUP
198-
writeEvents = unix.EPOLLOUT | unix.EPOLLRDHUP
199-
readWriteEvents = readEvents | writeEvents
200-
)
201-
202196
// AddReadWrite registers the given file-descriptor with readable and writable events to the poller.
203197
func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error {
204-
var ev uint32 = readWriteEvents
198+
var ev uint32 = ReadWriteEvents
205199
if edgeTriggered {
206-
ev |= unix.EPOLLET
200+
ev |= unix.EPOLLET | unix.EPOLLRDHUP
207201
}
208202
return os.NewSyscallError("epoll_ctl add",
209203
unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
210204
}
211205

212206
// AddRead registers the given file-descriptor with readable event to the poller.
213207
func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error {
214-
var ev uint32 = readEvents
208+
var ev uint32 = ReadEvents
215209
if edgeTriggered {
216-
ev |= unix.EPOLLET
210+
ev |= unix.EPOLLET | unix.EPOLLRDHUP
217211
}
218212
return os.NewSyscallError("epoll_ctl add",
219213
unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
220214
}
221215

222216
// AddWrite registers the given file-descriptor with writable event to the poller.
223217
func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error {
224-
var ev uint32 = writeEvents
218+
var ev uint32 = WriteEvents
225219
if edgeTriggered {
226-
ev |= unix.EPOLLET
220+
ev |= unix.EPOLLET | unix.EPOLLRDHUP
227221
}
228222
return os.NewSyscallError("epoll_ctl add",
229223
unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
230224
}
231225

232226
// ModRead renews the given file-descriptor with readable event in the poller.
233227
func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error {
234-
var ev uint32 = readEvents
228+
var ev uint32 = ReadEvents
235229
if edgeTriggered {
236-
ev |= unix.EPOLLET
230+
ev |= unix.EPOLLET | unix.EPOLLRDHUP
237231
}
238232
return os.NewSyscallError("epoll_ctl mod",
239233
unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
240234
}
241235

242236
// ModReadWrite renews the given file-descriptor with readable and writable events in the poller.
243237
func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error {
244-
var ev uint32 = readWriteEvents
238+
var ev uint32 = ReadWriteEvents
245239
if edgeTriggered {
246-
ev |= unix.EPOLLET
240+
ev |= unix.EPOLLET | unix.EPOLLRDHUP
247241
}
248242
return os.NewSyscallError("epoll_ctl mod",
249243
unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))

internal/netpoll/poller_epoll_ultimate.go

+10-16
Original file line numberDiff line numberDiff line change
@@ -195,18 +195,12 @@ func (p *Poller) Polling() error {
195195
}
196196
}
197197

198-
const (
199-
readEvents = unix.EPOLLIN | unix.EPOLLPRI | unix.EPOLLRDHUP
200-
writeEvents = unix.EPOLLOUT | unix.EPOLLRDHUP
201-
readWriteEvents = readEvents | writeEvents
202-
)
203-
204198
// AddReadWrite registers the given file-descriptor with readable and writable events to the poller.
205199
func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error {
206200
var ev epollevent
207-
ev.events = readWriteEvents
201+
ev.events = ReadWriteEvents
208202
if edgeTriggered {
209-
ev.events |= unix.EPOLLET
203+
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
210204
}
211205
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
212206
return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev))
@@ -215,9 +209,9 @@ func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error {
215209
// AddRead registers the given file-descriptor with readable event to the poller.
216210
func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error {
217211
var ev epollevent
218-
ev.events = readEvents
212+
ev.events = ReadEvents
219213
if edgeTriggered {
220-
ev.events |= unix.EPOLLET
214+
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
221215
}
222216
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
223217
return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev))
@@ -226,9 +220,9 @@ func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error {
226220
// AddWrite registers the given file-descriptor with writable event to the poller.
227221
func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error {
228222
var ev epollevent
229-
ev.events = writeEvents
223+
ev.events = WriteEvents
230224
if edgeTriggered {
231-
ev.events |= unix.EPOLLET
225+
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
232226
}
233227
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
234228
return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev))
@@ -237,9 +231,9 @@ func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error {
237231
// ModRead renews the given file-descriptor with readable event in the poller.
238232
func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error {
239233
var ev epollevent
240-
ev.events = readEvents
234+
ev.events = ReadEvents
241235
if edgeTriggered {
242-
ev.events |= unix.EPOLLET
236+
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
243237
}
244238
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
245239
return os.NewSyscallError("epoll_ctl mod", epollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &ev))
@@ -248,9 +242,9 @@ func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error {
248242
// ModReadWrite renews the given file-descriptor with readable and writable events in the poller.
249243
func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error {
250244
var ev epollevent
251-
ev.events = readWriteEvents
245+
ev.events = ReadWriteEvents
252246
if edgeTriggered {
253-
ev.events |= unix.EPOLLET
247+
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
254248
}
255249
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
256250
return os.NewSyscallError("epoll_ctl mod", epollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &ev))

0 commit comments

Comments
 (0)