Skip to content

Commit 48fe835

Browse files
authored
Merge pull request #54 from Allenxuxu/feat-ringbuf-len
add buffer length
2 parents d13351d + dd6a2d8 commit 48fe835

File tree

2 files changed

+129
-8
lines changed

2 files changed

+129
-8
lines changed

connection/connection.go

+23-8
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,16 @@ type CallBack interface {
2525

2626
// Connection TCP 连接
2727
type Connection struct {
28-
fd int
29-
connected atomic.Bool
30-
outBuffer *ringbuffer.RingBuffer // write buffer
31-
inBuffer *ringbuffer.RingBuffer // read buffer
32-
callBack CallBack
33-
loop *eventloop.EventLoop
34-
peerAddr string
35-
ctx interface{}
28+
fd int
29+
connected atomic.Bool
30+
outBuffer *ringbuffer.RingBuffer // write buffer
31+
outBufferLen atomic.Int64
32+
inBuffer *ringbuffer.RingBuffer // read buffer
33+
inBufferLen atomic.Int64
34+
callBack CallBack
35+
loop *eventloop.EventLoop
36+
peerAddr string
37+
ctx interface{}
3638
KeyValueContext
3739

3840
idleTime time.Duration
@@ -129,6 +131,16 @@ func (c *Connection) ShutdownWrite() error {
129131
return unix.Shutdown(c.fd, unix.SHUT_WR)
130132
}
131133

134+
// ReadBufferLength read buffer 当前积压的数据长度
135+
func (c *Connection) ReadBufferLength() int64 {
136+
return c.inBufferLen.Get()
137+
}
138+
139+
// WriteBufferLength write buffer 当前积压的数据长度
140+
func (c *Connection) WriteBufferLength() int64 {
141+
return c.outBufferLen.Get()
142+
}
143+
132144
// HandleEvent 内部使用,event loop 回调
133145
func (c *Connection) HandleEvent(fd int, events poller.Event) {
134146
if c.idleTime > 0 {
@@ -147,6 +159,9 @@ func (c *Connection) HandleEvent(fd int, events poller.Event) {
147159
} else if events&poller.EventRead != 0 {
148160
c.handleRead(fd)
149161
}
162+
163+
c.inBufferLen.Swap(int64(c.inBuffer.Length()))
164+
c.outBufferLen.Swap(int64(c.outBuffer.Length()))
150165
}
151166

152167
func (c *Connection) handlerProtocol(buffer *ringbuffer.RingBuffer) []byte {

example/bufferlength/main.go

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package main
2+
3+
import (
4+
"container/list"
5+
"log"
6+
"sync"
7+
"time"
8+
9+
"github.com/Allenxuxu/gev"
10+
"github.com/Allenxuxu/gev/connection"
11+
)
12+
13+
const clientsKey = "demo_push_message_key"
14+
15+
// Server example
16+
type Server struct {
17+
conn *list.List
18+
mu sync.RWMutex
19+
server *gev.Server
20+
}
21+
22+
// New server
23+
func New(ip, port string) (*Server, error) {
24+
var err error
25+
s := new(Server)
26+
s.conn = list.New()
27+
s.server, err = gev.NewServer(s,
28+
gev.Address(ip+":"+port))
29+
if err != nil {
30+
return nil, err
31+
}
32+
33+
return s, nil
34+
}
35+
36+
// Start server
37+
func (s *Server) Start() {
38+
s.server.RunEvery(1*time.Second, s.RunPush)
39+
s.server.Start()
40+
}
41+
42+
// Stop server
43+
func (s *Server) Stop() {
44+
s.server.Stop()
45+
}
46+
47+
// RunPush push message
48+
func (s *Server) RunPush() {
49+
var next *list.Element
50+
51+
s.mu.RLock()
52+
defer s.mu.RUnlock()
53+
54+
for e := s.conn.Front(); e != nil; e = next {
55+
next = e.Next()
56+
57+
c := e.Value.(*connection.Connection)
58+
if c.WriteBufferLength() > 1024*10 {
59+
log.Printf("write buffer length > 1024*10")
60+
continue
61+
}
62+
_ = c.Send([]byte("hello\n"))
63+
}
64+
}
65+
66+
// OnConnect callback
67+
func (s *Server) OnConnect(c *connection.Connection) {
68+
log.Println(" OnConnect : ", c.PeerAddr())
69+
70+
s.mu.Lock()
71+
e := s.conn.PushBack(c)
72+
s.mu.Unlock()
73+
c.Set(clientsKey, e)
74+
}
75+
76+
// OnMessage callback
77+
func (s *Server) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
78+
log.Printf("OnMessage, read buffer len %d, write buffer len %d \n", c.ReadBufferLength(), c.WriteBufferLength())
79+
80+
out = data
81+
return
82+
}
83+
84+
// OnClose callback
85+
func (s *Server) OnClose(c *connection.Connection) {
86+
log.Println("OnClose")
87+
v, ok := c.Get(clientsKey)
88+
if !ok {
89+
log.Println("OnClose : get key fail")
90+
return
91+
}
92+
93+
s.mu.Lock()
94+
s.conn.Remove(v.(*list.Element))
95+
s.mu.Unlock()
96+
}
97+
98+
func main() {
99+
s, err := New("", "1833")
100+
if err != nil {
101+
panic(err)
102+
}
103+
defer s.Stop()
104+
105+
s.Start()
106+
}

0 commit comments

Comments
 (0)