Skip to content

Commit 8650d11

Browse files
author
Gijs Peskens
authored
Centralized pollserver (#38)
This pollserver design is based on the Golang netpoll code. Using a centralized pollserver allows us to block on golang primitives instead of cgo calls for all calls that would block. Also it decreases the number of open filedescriptors by a factor of 3 (previously each socket opened 1 normal fd and 2 poll fd's).
1 parent ef76f63 commit 8650d11

File tree

7 files changed

+585
-144
lines changed

7 files changed

+585
-144
lines changed

accept.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package srtgo
2+
3+
/*
4+
#cgo LDFLAGS: -lsrt
5+
#include <srt/srt.h>
6+
7+
SRTSOCKET srt_accept_wrapped(SRTSOCKET lsn, struct sockaddr* addr, int* addrlen, int *srterror, int *syserror)
8+
{
9+
int ret = srt_accept(lsn, addr, addrlen);
10+
if (ret < 0) {
11+
*srterror = srt_getlasterror(syserror);
12+
}
13+
return ret;
14+
}
15+
16+
*/
17+
import "C"
18+
import (
19+
"fmt"
20+
"net"
21+
"syscall"
22+
"unsafe"
23+
)
24+
25+
func srtAcceptImpl(lsn C.SRTSOCKET, addr *C.struct_sockaddr, addrlen *C.int) (C.SRTSOCKET, error) {
26+
srterr := C.int(0)
27+
syserr := C.int(0)
28+
socket := C.srt_accept_wrapped(lsn, addr, addrlen, &srterr, &syserr)
29+
if srterr != 0 {
30+
srterror := SRTErrno(srterr)
31+
if syserr < 0 {
32+
srterror.wrapSysErr(syscall.Errno(syserr))
33+
}
34+
return socket, srterror
35+
}
36+
return socket, nil
37+
}
38+
39+
// Accept an incoming connection
40+
func (s SrtSocket) Accept() (*SrtSocket, *net.UDPAddr, error) {
41+
var err error
42+
if !s.blocking {
43+
err = s.pd.wait(ModeRead)
44+
if err != nil {
45+
return nil, nil, err
46+
}
47+
}
48+
var addr syscall.RawSockaddrAny
49+
sclen := C.int(syscall.SizeofSockaddrAny)
50+
socket, err := srtAcceptImpl(s.socket, (*C.struct_sockaddr)(unsafe.Pointer(&addr)), &sclen)
51+
if err != nil {
52+
return nil, nil, err
53+
}
54+
if socket == SRT_INVALID_SOCK {
55+
return nil, nil, fmt.Errorf("srt accept, error accepting the connection: %w", srtGetAndClearError())
56+
}
57+
58+
newSocket, err := newFromSocket(&s, socket)
59+
if err != nil {
60+
return nil, nil, fmt.Errorf("new socket could not be created: %w", err)
61+
}
62+
63+
udpAddr, err := udpAddrFromSockaddr(&addr)
64+
if err != nil {
65+
return nil, nil, err
66+
}
67+
68+
return newSocket, udpAddr, nil
69+
}

poll.go

Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
package srtgo
2+
3+
/*
4+
#cgo LDFLAGS: -lsrt
5+
#include <srt/srt.h>
6+
*/
7+
import "C"
8+
import (
9+
"sync"
10+
"sync/atomic"
11+
"time"
12+
)
13+
14+
const (
15+
pollDefault = int32(iota)
16+
pollReady = int32(iota)
17+
pollWait = int32(iota)
18+
)
19+
20+
type PollMode int
21+
22+
const (
23+
ModeRead = PollMode(iota)
24+
ModeWrite
25+
)
26+
27+
/*
28+
pollDesc contains the polling state for the associated SrtSocket
29+
closing: socket is closing, reject all poll operations
30+
pollErr: an error occured on the socket, indicates it's not useable anymore.
31+
unblockRd: is used to unblock the poller when the socket becomes ready for io
32+
rdState: polling state for read operations
33+
rdDeadline: deadline in NS before poll operation times out, -1 means timedout (needs to be cleared), 0 is without timeout
34+
rdSeq: sequence number protects against spurious signalling of timeouts when timer is reset.
35+
rdTimer: timer used to enforce deadline.
36+
*/
37+
type pollDesc struct {
38+
lock sync.Mutex
39+
closing bool
40+
fd C.SRTSOCKET
41+
pollErr bool
42+
unblockRd chan interface{}
43+
rdState int32
44+
rdLock sync.Mutex
45+
rdDeadline int64
46+
rdSeq int64
47+
rdTimer *time.Timer
48+
rtSeq int64
49+
unblockWr chan interface{}
50+
wrState int32
51+
wrLock sync.Mutex
52+
wdDeadline int64
53+
wdSeq int64
54+
wdTimer *time.Timer
55+
wtSeq int64
56+
pollS *pollServer
57+
}
58+
59+
var pdPool = sync.Pool{
60+
New: func() interface{} {
61+
return &pollDesc{
62+
unblockRd: make(chan interface{}),
63+
unblockWr: make(chan interface{}),
64+
rdTimer: time.NewTimer(0),
65+
wdTimer: time.NewTimer(0),
66+
}
67+
},
68+
}
69+
70+
func pollDescInit(s C.SRTSOCKET) *pollDesc {
71+
pd := pdPool.Get().(*pollDesc)
72+
pd.lock.Lock()
73+
defer pd.lock.Unlock()
74+
pd.fd = s
75+
pd.rdState = pollDefault
76+
pd.wrState = pollDefault
77+
pd.pollS = pollServerCtx()
78+
pd.closing = false
79+
pd.pollErr = false
80+
pd.rdSeq++
81+
pd.wdSeq++
82+
pd.pollS.pollOpen(pd)
83+
return pd
84+
}
85+
86+
func (pd *pollDesc) release() {
87+
pd.lock.Lock()
88+
defer pd.lock.Unlock()
89+
if !pd.closing || pd.rdState == pollWait || pd.wrState == pollWait {
90+
panic("returning open or blocked upon pollDesc")
91+
}
92+
pd.fd = 0
93+
pdPool.Put(pd)
94+
}
95+
96+
func (pd *pollDesc) wait(mode PollMode) error {
97+
defer pd.reset(mode)
98+
if err := pd.checkPollErr(mode); err != nil {
99+
return err
100+
}
101+
state := &pd.rdState
102+
unblockChan := pd.unblockRd
103+
expiryChan := pd.rdTimer.C
104+
timerSeq := int64(0)
105+
if mode == ModeRead {
106+
pd.lock.Lock()
107+
timerSeq = pd.rtSeq
108+
pd.lock.Unlock()
109+
pd.rdLock.Lock()
110+
defer pd.rdLock.Unlock()
111+
} else if mode == ModeWrite {
112+
pd.lock.Lock()
113+
timerSeq = pd.wtSeq
114+
pd.lock.Unlock()
115+
state = &pd.wrState
116+
unblockChan = pd.unblockWr
117+
expiryChan = pd.rdTimer.C
118+
pd.wrLock.Lock()
119+
defer pd.wrLock.Unlock()
120+
}
121+
122+
for {
123+
old := *state
124+
if old == pollReady {
125+
*state = pollDefault
126+
return nil
127+
}
128+
if atomic.CompareAndSwapInt32(state, pollDefault, pollWait) {
129+
break
130+
}
131+
}
132+
133+
wait:
134+
for {
135+
select {
136+
case <-unblockChan:
137+
break wait
138+
case <-expiryChan:
139+
pd.lock.Lock()
140+
if mode == ModeRead {
141+
if timerSeq == pd.rdSeq {
142+
pd.rdDeadline = -1
143+
pd.lock.Unlock()
144+
break wait
145+
}
146+
timerSeq = pd.rtSeq
147+
}
148+
if mode == ModeWrite {
149+
if timerSeq == pd.wdSeq {
150+
pd.wdDeadline = -1
151+
pd.lock.Unlock()
152+
break wait
153+
}
154+
timerSeq = pd.wtSeq
155+
}
156+
pd.lock.Unlock()
157+
}
158+
}
159+
err := pd.checkPollErr(mode)
160+
return err
161+
}
162+
163+
func (pd *pollDesc) close() {
164+
pd.lock.Lock()
165+
defer pd.lock.Unlock()
166+
if pd.closing {
167+
return
168+
}
169+
pd.closing = true
170+
pd.pollS.pollClose(pd)
171+
}
172+
173+
func (pd *pollDesc) checkPollErr(mode PollMode) error {
174+
pd.lock.Lock()
175+
defer pd.lock.Unlock()
176+
if pd.closing {
177+
return &SrtSocketClosed{}
178+
}
179+
180+
if mode == ModeRead && pd.rdDeadline < 0 || mode == ModeWrite && pd.wdDeadline < 0 {
181+
return &SrtEpollTimeout{}
182+
}
183+
184+
if pd.pollErr {
185+
return &SrtSocketClosed{}
186+
}
187+
188+
return nil
189+
}
190+
191+
func (pd *pollDesc) setDeadline(t time.Time, mode PollMode) {
192+
pd.lock.Lock()
193+
defer pd.lock.Unlock()
194+
var d int64
195+
if !t.IsZero() {
196+
d = int64(time.Until(t))
197+
if d == 0 {
198+
d = -1
199+
}
200+
}
201+
if mode == ModeRead || mode == ModeRead+ModeWrite {
202+
pd.rdSeq++
203+
pd.rtSeq = pd.rdSeq
204+
if pd.rdDeadline > 0 {
205+
pd.rdTimer.Stop()
206+
}
207+
pd.rdDeadline = d
208+
if d > 0 {
209+
pd.rdTimer.Reset(time.Duration(d))
210+
}
211+
if d < 0 {
212+
pd.unblock(ModeRead, false, false)
213+
}
214+
}
215+
if mode == ModeWrite || mode == ModeRead+ModeWrite {
216+
pd.wdSeq++
217+
pd.wtSeq = pd.wdSeq
218+
if pd.wdDeadline > 0 {
219+
pd.wdTimer.Stop()
220+
}
221+
pd.wdDeadline = d
222+
if d > 0 {
223+
pd.wdTimer.Reset(time.Duration(d))
224+
}
225+
if d < 0 {
226+
pd.unblock(ModeWrite, false, false)
227+
}
228+
}
229+
}
230+
231+
func (pd *pollDesc) unblock(mode PollMode, pollerr, ioready bool) {
232+
if pollerr {
233+
pd.lock.Lock()
234+
pd.pollErr = pollerr
235+
pd.lock.Unlock()
236+
}
237+
state := &pd.rdState
238+
unblockChan := pd.unblockRd
239+
if mode == ModeWrite {
240+
state = &pd.wrState
241+
unblockChan = pd.unblockWr
242+
}
243+
old := atomic.LoadInt32(state)
244+
if ioready {
245+
atomic.StoreInt32(state, pollReady)
246+
}
247+
if old == pollWait {
248+
//make sure we never block here
249+
select {
250+
case unblockChan <- struct{}{}:
251+
//
252+
default:
253+
//
254+
}
255+
}
256+
}
257+
258+
func (pd *pollDesc) reset(mode PollMode) {
259+
if mode == ModeRead {
260+
pd.rdLock.Lock()
261+
pd.rdState = pollDefault
262+
pd.rdLock.Unlock()
263+
} else if mode == ModeWrite {
264+
pd.wrLock.Lock()
265+
pd.wrState = pollDefault
266+
pd.wrLock.Unlock()
267+
}
268+
}

0 commit comments

Comments
 (0)