Skip to content

Commit 0941a27

Browse files
committed
lock only once for kcpInput()
1 parent fa77ab6 commit 0941a27

File tree

1 file changed

+35
-23
lines changed

1 file changed

+35
-23
lines changed

sess.go

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -546,18 +546,25 @@ func (s *UDPSession) notifyWriteEvent() {
546546
}
547547

548548
func (s *UDPSession) kcpInput(data []byte) {
549+
kcpInErrors := uint64(0)
550+
fecErrs := uint64(0)
551+
fecRecovered := uint64(0)
552+
549553
if s.fec != nil {
550554
f := s.fec.decode(data)
555+
s.mu.Lock()
556+
if f.flag == typeData {
557+
if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true); ret != 0 {
558+
atomic.AddUint64(&DefaultSnmp.KCPInErrors, 1)
559+
}
560+
}
561+
551562
if f.flag == typeData || f.flag == typeFEC {
552563
if f.flag == typeFEC {
553564
atomic.AddUint64(&DefaultSnmp.FECSegs, 1)
554565
}
555566

556567
if recovers := s.fec.input(f); recovers != nil {
557-
s.mu.Lock()
558-
kcpInErrors := uint64(0)
559-
fecErrs := uint64(0)
560-
fecRecovered := uint64(0)
561568
for _, r := range recovers {
562569
if len(r) >= 2 { // must be larger than 2bytes
563570
sz := binary.LittleEndian.Uint16(r)
@@ -574,38 +581,43 @@ func (s *UDPSession) kcpInput(data []byte) {
574581
fecErrs++
575582
}
576583
}
577-
s.mu.Unlock()
578-
atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)
579-
atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
580-
atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
581584
}
582585
}
583-
if f.flag == typeData {
584-
s.mu.Lock()
585-
if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true); ret != 0 {
586-
atomic.AddUint64(&DefaultSnmp.KCPInErrors, 1)
587-
}
588-
s.mu.Unlock()
586+
587+
// notify reader
588+
if n := s.kcp.PeekSize(); n > 0 {
589+
s.notifyReadEvent()
590+
}
591+
if s.ackNoDelay {
592+
s.kcp.flush()
589593
}
594+
s.mu.Unlock()
590595
} else {
591596
s.mu.Lock()
592597
if ret := s.kcp.Input(data, true); ret != 0 {
593598
atomic.AddUint64(&DefaultSnmp.KCPInErrors, 1)
594599
}
600+
// notify reader
601+
if n := s.kcp.PeekSize(); n > 0 {
602+
s.notifyReadEvent()
603+
}
604+
if s.ackNoDelay {
605+
s.kcp.flush()
606+
}
595607
s.mu.Unlock()
596608
}
597609

598-
// notify reader
599-
s.mu.Lock()
600-
if n := s.kcp.PeekSize(); n > 0 {
601-
s.notifyReadEvent()
602-
}
603-
if s.ackNoDelay {
604-
s.kcp.flush()
605-
}
606-
s.mu.Unlock()
607610
atomic.AddUint64(&DefaultSnmp.InSegs, 1)
608611
atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data)))
612+
if kcpInErrors > 0 {
613+
atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)
614+
}
615+
if fecErrs > 0 {
616+
atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
617+
}
618+
if fecRecovered > 0 {
619+
atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
620+
}
609621
}
610622

611623
func (s *UDPSession) receiver(ch chan []byte) {

0 commit comments

Comments
 (0)