Skip to content

Commit 7c104a5

Browse files
committed
add WriteBuffers function to send a vector of slice in batch
1 parent 59b7c4b commit 7c104a5

File tree

2 files changed

+86
-0
lines changed

2 files changed

+86
-0
lines changed

sess.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,67 @@ func (s *UDPSession) Write(b []byte) (n int, err error) {
304304
}
305305
}
306306

307+
// WriteBuffers write a vector of byte slices to the underlying connection
308+
func (s *UDPSession) WriteBuffers(v [][]byte) (n int, err error) {
309+
for {
310+
s.mu.Lock()
311+
if s.isClosed {
312+
s.mu.Unlock()
313+
return 0, errors.New(errBrokenPipe)
314+
}
315+
316+
if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
317+
for _, b := range v {
318+
n += len(b)
319+
for {
320+
if len(b) <= int(s.kcp.mss) {
321+
s.kcp.Send(b)
322+
break
323+
} else {
324+
s.kcp.Send(b[:s.kcp.mss])
325+
b = b[s.kcp.mss:]
326+
}
327+
}
328+
}
329+
330+
if s.kcp.WaitSnd() >= int(s.kcp.snd_wnd) || !s.writeDelay {
331+
s.kcp.flush(false)
332+
}
333+
s.mu.Unlock()
334+
atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
335+
return n, nil
336+
}
337+
338+
var timeout *time.Timer
339+
var c <-chan time.Time
340+
if !s.wd.IsZero() {
341+
if time.Now().After(s.wd) {
342+
s.mu.Unlock()
343+
return 0, errTimeout{}
344+
}
345+
delay := s.wd.Sub(time.Now())
346+
timeout = time.NewTimer(delay)
347+
c = timeout.C
348+
}
349+
s.mu.Unlock()
350+
351+
select {
352+
case <-s.chWriteEvent:
353+
case <-c:
354+
case <-s.die:
355+
case err = <-s.chWriteError:
356+
if timeout != nil {
357+
timeout.Stop()
358+
}
359+
return n, err
360+
}
361+
362+
if timeout != nil {
363+
timeout.Stop()
364+
}
365+
}
366+
}
367+
307368
// Close closes the connection.
308369
func (s *UDPSession) Close() error {
309370
// remove current session from updater & listener(if necessary)

sess_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,31 @@ func TestSendRecv(t *testing.T) {
272272
cli.Close()
273273
}
274274

275+
func TestSendVector(t *testing.T) {
276+
cli, err := dialEcho()
277+
if err != nil {
278+
panic(err)
279+
}
280+
cli.SetWriteDelay(false)
281+
const N = 100
282+
buf := make([]byte, 20)
283+
v := make([][]byte, 2)
284+
for i := 0; i < N; i++ {
285+
v[0] = []byte(fmt.Sprintf("hello%v", i))
286+
v[1] = []byte(fmt.Sprintf("world%v", i))
287+
msg := fmt.Sprintf("hello%vworld%v", i, i)
288+
cli.WriteBuffers(v)
289+
if n, err := cli.Read(buf); err == nil {
290+
if string(buf[:n]) != msg {
291+
t.Error(string(buf[:n]), msg)
292+
}
293+
} else {
294+
panic(err)
295+
}
296+
}
297+
cli.Close()
298+
}
299+
275300
func TestTinyBufferReceiver(t *testing.T) {
276301
cli, err := dialTinyBufferEcho()
277302
if err != nil {

0 commit comments

Comments
 (0)