Skip to content

Commit 9a2032f

Browse files
committed
feat: implement the gnet.Conn.AsyncWritev()
Fixes #245
1 parent aa0254f commit 9a2032f

File tree

5 files changed

+164
-39
lines changed

5 files changed

+164
-39
lines changed

connection_unix.go

+68
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"golang.org/x/sys/unix"
2626

27+
"github.com/panjf2000/gnet/internal/io"
2728
"github.com/panjf2000/gnet/internal/netpoll"
2829
"github.com/panjf2000/gnet/internal/socket"
2930
"github.com/panjf2000/gnet/pkg/mixedbuffer"
@@ -39,6 +40,7 @@ type conn struct {
3940
loop *eventloop // connected event-loop
4041
codec ICodec // codec for TCP
4142
opened bool // connection opened event fired
43+
packets [][]byte // reuse it for multiple byte slices
4244
localAddr net.Addr // local addr
4345
remoteAddr net.Addr // remote addr
4446
inboundBuffer *ringbuffer.RingBuffer // buffer for leftover data from the peer
@@ -74,6 +76,7 @@ func (c *conn) releaseTCP() {
7476
c.outboundBuffer.Release()
7577
netpoll.PutPollAttachment(c.pollAttachment)
7678
c.pollAttachment = nil
79+
c.packets = c.packets[:0]
7780
}
7881

7982
func newUDPConn(fd int, el *eventloop, localAddr net.Addr, sa unix.Sockaddr, connected bool) (c *conn) {
@@ -156,13 +159,74 @@ func (c *conn) write(buf []byte) (err error) {
156159
return
157160
}
158161

162+
func (c *conn) writev(bs [][]byte) (err error) {
163+
defer func() {
164+
for _, b := range bs {
165+
c.loop.eventHandler.AfterWrite(c, b)
166+
}
167+
c.packets = c.packets[:0]
168+
}()
169+
170+
var sum int
171+
for _, b := range bs {
172+
var packet []byte
173+
if packet, err = c.codec.Encode(c, b); err != nil {
174+
return
175+
}
176+
c.packets = append(c.packets, packet)
177+
sum += len(packet)
178+
c.loop.eventHandler.PreWrite(c)
179+
}
180+
181+
// If there is pending data in outbound buffer, the current data ought to be appended to the outbound buffer
182+
// for maintaining the sequence of network packets.
183+
if !c.outboundBuffer.IsEmpty() {
184+
_, _ = c.outboundBuffer.Writev(c.packets)
185+
return
186+
}
187+
188+
var n int
189+
if n, err = io.Writev(c.fd, c.packets); err != nil {
190+
// A temporary error occurs, append the data to outbound buffer, writing it back to the peer in the next round.
191+
if err == unix.EAGAIN {
192+
_, _ = c.outboundBuffer.Writev(c.packets)
193+
err = c.loop.poller.ModReadWrite(c.pollAttachment)
194+
return
195+
}
196+
return c.loop.loopCloseConn(c, os.NewSyscallError("write", err))
197+
}
198+
// Failed to send all data back to the peer, buffer the leftover data for the next round.
199+
if n < sum {
200+
var pos int
201+
for i := range c.packets {
202+
np := len(c.packets[i])
203+
if n < np {
204+
pos = i
205+
c.packets[i] = c.packets[i][n:]
206+
break
207+
}
208+
n -= np
209+
}
210+
_, _ = c.outboundBuffer.Writev(c.packets[pos:])
211+
err = c.loop.poller.ModReadWrite(c.pollAttachment)
212+
}
213+
return
214+
}
215+
159216
func (c *conn) asyncWrite(itf interface{}) error {
160217
if !c.opened {
161218
return nil
162219
}
163220
return c.write(itf.([]byte))
164221
}
165222

223+
func (c *conn) asyncWritev(itf interface{}) error {
224+
if !c.opened {
225+
return nil
226+
}
227+
return c.writev(itf.([][]byte))
228+
}
229+
166230
func (c *conn) sendTo(buf []byte) error {
167231
c.loop.eventHandler.PreWrite(c)
168232
defer c.loop.eventHandler.AfterWrite(c, buf)
@@ -238,6 +302,10 @@ func (c *conn) AsyncWrite(buf []byte) error {
238302
return c.loop.poller.Trigger(c.asyncWrite, buf)
239303
}
240304

305+
func (c *conn) AsyncWritev(bs [][]byte) error {
306+
return c.loop.poller.Trigger(c.asyncWritev, bs)
307+
}
308+
241309
func (c *conn) SendTo(buf []byte) error {
242310
return c.sendTo(buf)
243311
}

connection_windows.go

+10
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,16 @@ func (c *stdConn) AsyncWrite(buf []byte) (err error) {
246246
return
247247
}
248248

249+
func (c *stdConn) AsyncWritev(bs [][]byte) (err error) {
250+
for _, b := range bs {
251+
err = c.AsyncWrite(b)
252+
if err != nil {
253+
return
254+
}
255+
}
256+
return
257+
}
258+
249259
func (c *stdConn) SendTo(buf []byte) (err error) {
250260
c.loop.eventHandler.PreWrite(c)
251261
_, err = c.loop.svr.ln.packetConn.WriteTo(buf, c.remoteAddr)

gnet.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,14 @@ type Conn interface {
129129
// SendTo writes data for UDP sockets, it allows you to send data back to UDP socket in individual goroutines.
130130
SendTo(buf []byte) error
131131

132-
// AsyncWrite writes data to peer asynchronously, usually you would call it in individual goroutines
132+
// AsyncWrite writes one byte slice to peer asynchronously, usually you would call it in individual goroutines
133133
// instead of the event-loop goroutines.
134134
AsyncWrite(buf []byte) error
135135

136+
// AsyncWritev writes multiple byte slices to peer asynchronously, usually you would call it in individual goroutines
137+
// instead of the event-loop goroutines.
138+
AsyncWritev(bs [][]byte) error
139+
136140
// Wake triggers a React event for the connection.
137141
Wake() error
138142

gnet_test.go

+64-38
Original file line numberDiff line numberDiff line change
@@ -323,152 +323,168 @@ func TestServe(t *testing.T) {
323323
t.Run("poll", func(t *testing.T) {
324324
t.Run("tcp", func(t *testing.T) {
325325
t.Run("1-loop", func(t *testing.T) {
326-
testServe(t, "tcp", ":9991", false, false, false, false, 10, RoundRobin)
326+
testServe(t, "tcp", ":9991", false, false, false, false, false, 10, RoundRobin)
327327
})
328328
t.Run("N-loop", func(t *testing.T) {
329-
testServe(t, "tcp", ":9992", false, false, true, false, 10, LeastConnections)
329+
testServe(t, "tcp", ":9992", false, false, true, false, false, 10, LeastConnections)
330330
})
331331
})
332332
t.Run("tcp-async", func(t *testing.T) {
333333
t.Run("1-loop", func(t *testing.T) {
334-
testServe(t, "tcp", ":9991", false, false, false, true, 10, RoundRobin)
334+
testServe(t, "tcp", ":9991", false, false, false, true, false, 10, RoundRobin)
335335
})
336336
t.Run("N-loop", func(t *testing.T) {
337-
testServe(t, "tcp", ":9992", false, false, true, true, 10, LeastConnections)
337+
testServe(t, "tcp", ":9992", false, false, true, true, false, 10, LeastConnections)
338+
})
339+
})
340+
t.Run("tcp-async-writev", func(t *testing.T) {
341+
t.Run("1-loop", func(t *testing.T) {
342+
testServe(t, "tcp", ":9991", false, false, false, true, true, 10, RoundRobin)
343+
})
344+
t.Run("N-loop", func(t *testing.T) {
345+
testServe(t, "tcp", ":9992", false, false, true, true, true, 10, LeastConnections)
338346
})
339347
})
340348
t.Run("udp", func(t *testing.T) {
341349
t.Run("1-loop", func(t *testing.T) {
342-
testServe(t, "udp", ":9991", false, false, false, false, 10, RoundRobin)
350+
testServe(t, "udp", ":9991", false, false, false, false, false, 10, RoundRobin)
343351
})
344352
t.Run("N-loop", func(t *testing.T) {
345-
testServe(t, "udp", ":9992", false, false, true, false, 10, LeastConnections)
353+
testServe(t, "udp", ":9992", false, false, true, false, false, 10, LeastConnections)
346354
})
347355
})
348356
t.Run("udp-async", func(t *testing.T) {
349357
t.Run("1-loop", func(t *testing.T) {
350-
testServe(t, "udp", ":9991", false, false, false, true, 10, RoundRobin)
358+
testServe(t, "udp", ":9991", false, false, false, true, false, 10, RoundRobin)
351359
})
352360
t.Run("N-loop", func(t *testing.T) {
353-
testServe(t, "udp", ":9992", false, false, true, true, 10, LeastConnections)
361+
testServe(t, "udp", ":9992", false, false, true, true, false, 10, LeastConnections)
354362
})
355363
})
356364
t.Run("unix", func(t *testing.T) {
357365
t.Run("1-loop", func(t *testing.T) {
358-
testServe(t, "unix", "gnet1.sock", false, false, false, false, 10, RoundRobin)
366+
testServe(t, "unix", "gnet1.sock", false, false, false, false, false, 10, RoundRobin)
359367
})
360368
t.Run("N-loop", func(t *testing.T) {
361-
testServe(t, "unix", "gnet2.sock", false, false, true, false, 10, SourceAddrHash)
369+
testServe(t, "unix", "gnet2.sock", false, false, true, false, false, 10, SourceAddrHash)
362370
})
363371
})
364372
t.Run("unix-async", func(t *testing.T) {
365373
t.Run("1-loop", func(t *testing.T) {
366-
testServe(t, "unix", "gnet1.sock", false, false, false, true, 10, RoundRobin)
374+
testServe(t, "unix", "gnet1.sock", false, false, false, true, false, 10, RoundRobin)
375+
})
376+
t.Run("N-loop", func(t *testing.T) {
377+
testServe(t, "unix", "gnet2.sock", false, false, true, true, false, 10, SourceAddrHash)
378+
})
379+
})
380+
t.Run("unix-async-writev", func(t *testing.T) {
381+
t.Run("1-loop", func(t *testing.T) {
382+
testServe(t, "unix", "gnet1.sock", false, false, false, true, true, 10, RoundRobin)
367383
})
368384
t.Run("N-loop", func(t *testing.T) {
369-
testServe(t, "unix", "gnet2.sock", false, false, true, true, 10, SourceAddrHash)
385+
testServe(t, "unix", "gnet2.sock", false, false, true, true, true, 10, SourceAddrHash)
370386
})
371387
})
372388
})
373389

374390
t.Run("poll-reuseport", func(t *testing.T) {
375391
t.Run("tcp", func(t *testing.T) {
376392
t.Run("1-loop", func(t *testing.T) {
377-
testServe(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin)
393+
testServe(t, "tcp", ":9991", true, true, false, false, false, 10, RoundRobin)
378394
})
379395
t.Run("N-loop", func(t *testing.T) {
380-
testServe(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
396+
testServe(t, "tcp", ":9992", true, true, true, false, false, 10, LeastConnections)
381397
})
382398
})
383399
t.Run("tcp-async", func(t *testing.T) {
384400
t.Run("1-loop", func(t *testing.T) {
385-
testServe(t, "tcp", ":9991", true, true, false, true, 10, RoundRobin)
401+
testServe(t, "tcp", ":9991", true, true, false, true, false, 10, RoundRobin)
386402
})
387403
t.Run("N-loop", func(t *testing.T) {
388-
testServe(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
404+
testServe(t, "tcp", ":9992", true, true, true, false, false, 10, LeastConnections)
389405
})
390406
})
391407
t.Run("udp", func(t *testing.T) {
392408
t.Run("1-loop", func(t *testing.T) {
393-
testServe(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
409+
testServe(t, "udp", ":9991", true, true, false, false, false, 10, RoundRobin)
394410
})
395411
t.Run("N-loop", func(t *testing.T) {
396-
testServe(t, "udp", ":9992", true, true, true, false, 10, LeastConnections)
412+
testServe(t, "udp", ":9992", true, true, true, false, false, 10, LeastConnections)
397413
})
398414
})
399415
t.Run("udp-async", func(t *testing.T) {
400416
t.Run("1-loop", func(t *testing.T) {
401-
testServe(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
417+
testServe(t, "udp", ":9991", true, true, false, false, false, 10, RoundRobin)
402418
})
403419
t.Run("N-loop", func(t *testing.T) {
404-
testServe(t, "udp", ":9992", true, true, true, true, 10, LeastConnections)
420+
testServe(t, "udp", ":9992", true, true, true, true, false, 10, LeastConnections)
405421
})
406422
})
407423
t.Run("unix", func(t *testing.T) {
408424
t.Run("1-loop", func(t *testing.T) {
409-
testServe(t, "unix", "gnet1.sock", true, true, false, false, 10, RoundRobin)
425+
testServe(t, "unix", "gnet1.sock", true, true, false, false, false, 10, RoundRobin)
410426
})
411427
t.Run("N-loop", func(t *testing.T) {
412-
testServe(t, "unix", "gnet2.sock", true, true, true, false, 10, LeastConnections)
428+
testServe(t, "unix", "gnet2.sock", true, true, true, false, false, 10, LeastConnections)
413429
})
414430
})
415431
t.Run("unix-async", func(t *testing.T) {
416432
t.Run("1-loop", func(t *testing.T) {
417-
testServe(t, "unix", "gnet1.sock", true, true, false, true, 10, RoundRobin)
433+
testServe(t, "unix", "gnet1.sock", true, true, false, true, false, 10, RoundRobin)
418434
})
419435
t.Run("N-loop", func(t *testing.T) {
420-
testServe(t, "unix", "gnet2.sock", true, true, true, true, 10, LeastConnections)
436+
testServe(t, "unix", "gnet2.sock", true, true, true, true, false, 10, LeastConnections)
421437
})
422438
})
423439
})
424440

425441
t.Run("poll-reuseaddr", func(t *testing.T) {
426442
t.Run("tcp", func(t *testing.T) {
427443
t.Run("1-loop", func(t *testing.T) {
428-
testServe(t, "tcp", ":9991", false, true, false, false, 10, RoundRobin)
444+
testServe(t, "tcp", ":9991", false, true, false, false, false, 10, RoundRobin)
429445
})
430446
t.Run("N-loop", func(t *testing.T) {
431-
testServe(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
447+
testServe(t, "tcp", ":9992", false, true, true, false, false, 10, LeastConnections)
432448
})
433449
})
434450
t.Run("tcp-async", func(t *testing.T) {
435451
t.Run("1-loop", func(t *testing.T) {
436-
testServe(t, "tcp", ":9991", false, true, false, true, 10, RoundRobin)
452+
testServe(t, "tcp", ":9991", false, true, false, true, false, 10, RoundRobin)
437453
})
438454
t.Run("N-loop", func(t *testing.T) {
439-
testServe(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
455+
testServe(t, "tcp", ":9992", false, true, true, false, false, 10, LeastConnections)
440456
})
441457
})
442458
t.Run("udp", func(t *testing.T) {
443459
t.Run("1-loop", func(t *testing.T) {
444-
testServe(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
460+
testServe(t, "udp", ":9991", false, true, false, false, false, 10, RoundRobin)
445461
})
446462
t.Run("N-loop", func(t *testing.T) {
447-
testServe(t, "udp", ":9992", false, true, true, false, 10, LeastConnections)
463+
testServe(t, "udp", ":9992", false, true, true, false, false, 10, LeastConnections)
448464
})
449465
})
450466
t.Run("udp-async", func(t *testing.T) {
451467
t.Run("1-loop", func(t *testing.T) {
452-
testServe(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
468+
testServe(t, "udp", ":9991", false, true, false, false, false, 10, RoundRobin)
453469
})
454470
t.Run("N-loop", func(t *testing.T) {
455-
testServe(t, "udp", ":9992", false, true, true, true, 10, LeastConnections)
471+
testServe(t, "udp", ":9992", false, true, true, true, false, 10, LeastConnections)
456472
})
457473
})
458474
t.Run("unix", func(t *testing.T) {
459475
t.Run("1-loop", func(t *testing.T) {
460-
testServe(t, "unix", "gnet1.sock", false, true, false, false, 10, RoundRobin)
476+
testServe(t, "unix", "gnet1.sock", false, true, false, false, false, 10, RoundRobin)
461477
})
462478
t.Run("N-loop", func(t *testing.T) {
463-
testServe(t, "unix", "gnet2.sock", false, true, true, false, 10, LeastConnections)
479+
testServe(t, "unix", "gnet2.sock", false, true, true, false, false, 10, LeastConnections)
464480
})
465481
})
466482
t.Run("unix-async", func(t *testing.T) {
467483
t.Run("1-loop", func(t *testing.T) {
468-
testServe(t, "unix", "gnet1.sock", false, true, false, true, 10, RoundRobin)
484+
testServe(t, "unix", "gnet1.sock", false, true, false, true, false, 10, RoundRobin)
469485
})
470486
t.Run("N-loop", func(t *testing.T) {
471-
testServe(t, "unix", "gnet2.sock", false, true, true, true, 10, LeastConnections)
487+
testServe(t, "unix", "gnet2.sock", false, true, true, true, false, 10, LeastConnections)
472488
})
473489
})
474490
})
@@ -482,6 +498,7 @@ type testServer struct {
482498
addr string
483499
multicore bool
484500
async bool
501+
writev bool
485502
nclients int
486503
started int32
487504
connected int32
@@ -534,7 +551,15 @@ func (s *testServer) React(packet []byte, c Conn) (out []byte, action Action) {
534551

535552
_ = s.workerPool.Submit(
536553
func() {
537-
_ = c.AsyncWrite(buf.Bytes())
554+
if s.writev {
555+
mid := buf.Len() / 2
556+
bs := make([][]byte, 2)
557+
bs[0] = buf.B[:mid]
558+
bs[1] = buf.B[mid:]
559+
_ = c.AsyncWritev(bs)
560+
} else {
561+
_ = c.AsyncWrite(buf.Bytes())
562+
}
538563
})
539564
return
540565
} else if s.network == "udp" {
@@ -568,13 +593,14 @@ func (s *testServer) Tick() (delay time.Duration, action Action) {
568593
return
569594
}
570595

571-
func testServe(t *testing.T, network, addr string, reuseport, reuseaddr, multicore, async bool, nclients int, lb LoadBalancing) {
596+
func testServe(t *testing.T, network, addr string, reuseport, reuseaddr, multicore, async, writev bool, nclients int, lb LoadBalancing) {
572597
ts := &testServer{
573598
tester: t,
574599
network: network,
575600
addr: addr,
576601
multicore: multicore,
577602
async: async,
603+
writev: writev,
578604
nclients: nclients,
579605
workerPool: goroutine.Default(),
580606
}

0 commit comments

Comments
 (0)