Skip to content

Commit c8fc2eb

Browse files
gnawuxdomodwyer
authored andcommitted
enable shrink the socket pool size (globalsign#116)
* enable shrink the socket pool size we found the mgo will allocate the pool size during burst traffic but won't close the sockets any more until restart the client or server. And the mongo document defines two related query options - [minPoolSize](https://docs.mongodb.com/manual/reference/connection-string/#urioption.minPoolSize) - [maxIdleTimeMS](https://docs.mongodb.com/manual/reference/connection-string/#urioption.maxIdleTimeMS) By implementing these two options, it could shrink the pool to minPoolSize after the sockets introduced by burst traffic timeout. The idea comes from https://github.com/JodeZer/mgo , he investigated this issue and provide the initial commits. I found there are still some issue in sockets maintenance, and had a PR against his repo JodeZer#1 . This commit include JodeZer's commits and my fix, and I simplified the data structure. What's in this commit could be described as this figure: +------------------------+ | Session | <-------+ Add options here +------------------------+ +------------------------+ | Cluster | <-------+ Add options here +------------------------+ +------------------------+ | Server | <-------+*Add options here | | *add timestamp when recycle a socket +---+ | +-----------+ | +---+ *periodically check the unused sockets | | | shrinker <------+ and reclaim the timeout sockets. +---+ | +-----------+ | | | | | +------------------------+ | | +------------------------+ | | Socket | <-------+ Add a field for last used times+---------+ +------------------------+ Signed-off-by: Wang Xu <[email protected]> * tests for shrink the socks pool Signed-off-by: Wang Xu <[email protected]>
1 parent 9741bfc commit c8fc2eb

File tree

5 files changed

+212
-24
lines changed

5 files changed

+212
-24
lines changed

cluster.go

+20-16
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,23 @@ import (
4848

4949
type mongoCluster struct {
5050
sync.RWMutex
51-
serverSynced sync.Cond
52-
userSeeds []string
53-
dynaSeeds []string
54-
servers mongoServers
55-
masters mongoServers
56-
references int
57-
syncing bool
58-
direct bool
59-
failFast bool
60-
syncCount uint
61-
setName string
62-
cachedIndex map[string]bool
63-
sync chan bool
64-
dial dialer
65-
appName string
51+
serverSynced sync.Cond
52+
userSeeds []string
53+
dynaSeeds []string
54+
servers mongoServers
55+
masters mongoServers
56+
references int
57+
syncing bool
58+
direct bool
59+
failFast bool
60+
syncCount uint
61+
setName string
62+
cachedIndex map[string]bool
63+
sync chan bool
64+
dial dialer
65+
appName string
66+
minPoolSize int
67+
maxIdleTimeMS int
6668
}
6769

6870
func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster {
@@ -437,11 +439,13 @@ func (cluster *mongoCluster) syncServersLoop() {
437439
func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoServer {
438440
cluster.RLock()
439441
server := cluster.servers.Search(tcpaddr.String())
442+
minPoolSize := cluster.minPoolSize
443+
maxIdleTimeMS := cluster.maxIdleTimeMS
440444
cluster.RUnlock()
441445
if server != nil {
442446
return server
443447
}
444-
return newServer(addr, tcpaddr, cluster.sync, cluster.dial)
448+
return newServer(addr, tcpaddr, cluster.sync, cluster.dial, minPoolSize, maxIdleTimeMS)
445449
}
446450

447451
func resolveAddr(addr string) (*net.TCPAddr, error) {

server.go

+63-8
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ type mongoServer struct {
5555
pingCount uint32
5656
closed bool
5757
abended bool
58+
minPoolSize int
59+
maxIdleTimeMS int
5860
}
5961

6062
type dialer struct {
@@ -76,17 +78,22 @@ type mongoServerInfo struct {
7678

7779
var defaultServerInfo mongoServerInfo
7880

79-
func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *mongoServer {
81+
func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer {
8082
server := &mongoServer{
81-
Addr: addr,
82-
ResolvedAddr: tcpaddr.String(),
83-
tcpaddr: tcpaddr,
84-
sync: sync,
85-
dial: dial,
86-
info: &defaultServerInfo,
87-
pingValue: time.Hour, // Push it back before an actual ping.
83+
Addr: addr,
84+
ResolvedAddr: tcpaddr.String(),
85+
tcpaddr: tcpaddr,
86+
sync: sync,
87+
dial: dial,
88+
info: &defaultServerInfo,
89+
pingValue: time.Hour, // Push it back before an actual ping.
90+
minPoolSize: minPoolSize,
91+
maxIdleTimeMS: maxIdleTimeMS,
8892
}
8993
go server.pinger(true)
94+
if maxIdleTimeMS != 0 {
95+
go server.poolShrinker()
96+
}
9097
return server
9198
}
9299

@@ -221,6 +228,7 @@ func (server *mongoServer) close(waitForIdle bool) {
221228
func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
222229
server.Lock()
223230
if !server.closed {
231+
socket.lastTimeUsed = time.Now()
224232
server.unusedSockets = append(server.unusedSockets, socket)
225233
}
226234
server.Unlock()
@@ -346,6 +354,53 @@ func (server *mongoServer) pinger(loop bool) {
346354
}
347355
}
348356

357+
func (server *mongoServer) poolShrinker() {
358+
ticker := time.NewTicker(1 * time.Minute)
359+
for _ = range ticker.C {
360+
if server.closed {
361+
ticker.Stop()
362+
return
363+
}
364+
server.Lock()
365+
unused := len(server.unusedSockets)
366+
if unused < server.minPoolSize {
367+
server.Unlock()
368+
continue
369+
}
370+
now := time.Now()
371+
end := 0
372+
reclaimMap := map[*mongoSocket]struct{}{}
373+
// Because the acquisition and recycle are done at the tail of array,
374+
// the head is always the oldest unused socket.
375+
for _, s := range server.unusedSockets[:unused-server.minPoolSize] {
376+
if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) {
377+
break
378+
}
379+
end++
380+
reclaimMap[s] = struct{}{}
381+
}
382+
tbr := server.unusedSockets[:end]
383+
if end > 0 {
384+
next := make([]*mongoSocket, unused-end)
385+
copy(next, server.unusedSockets[end:])
386+
server.unusedSockets = next
387+
remainSockets := []*mongoSocket{}
388+
for _, s := range server.liveSockets {
389+
if _, ok := reclaimMap[s]; !ok {
390+
remainSockets = append(remainSockets, s)
391+
}
392+
}
393+
server.liveSockets = remainSockets
394+
stats.conn(-1*end, server.info.Master)
395+
}
396+
server.Unlock()
397+
398+
for _, s := range tbr {
399+
s.Close()
400+
}
401+
}
402+
}
403+
349404
type mongoServerSlice []*mongoServer
350405

351406
func (s mongoServerSlice) Len() int {

session.go

+42
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,16 @@ const (
271271
// Defines the per-server socket pool limit. Defaults to 4096.
272272
// See Session.SetPoolLimit for details.
273273
//
274+
// minPoolSize=<limit>
275+
//
276+
// Defines the per-server socket pool minium size. Defaults to 0.
277+
//
278+
// maxIdleTimeMS=<millisecond>
279+
//
280+
// The maximum number of milliseconds that a connection can remain idle in the pool
281+
// before being removed and closed. If maxIdleTimeMS is 0, connections will never be
282+
// closed due to inactivity.
283+
//
274284
// appName=<appName>
275285
//
276286
// The identifier of this client application. This parameter is used to
@@ -322,6 +332,8 @@ func ParseURL(url string) (*DialInfo, error) {
322332
appName := ""
323333
readPreferenceMode := Primary
324334
var readPreferenceTagSets []bson.D
335+
minPoolSize := 0
336+
maxIdleTimeMS := 0
325337
for _, opt := range uinfo.options {
326338
switch opt.key {
327339
case "authSource":
@@ -368,6 +380,22 @@ func ParseURL(url string) (*DialInfo, error) {
368380
doc = append(doc, bson.DocElem{Name: strings.TrimSpace(kvp[0]), Value: strings.TrimSpace(kvp[1])})
369381
}
370382
readPreferenceTagSets = append(readPreferenceTagSets, doc)
383+
case "minPoolSize":
384+
minPoolSize, err = strconv.Atoi(opt.value)
385+
if err != nil {
386+
return nil, errors.New("bad value for minPoolSize: " + opt.value)
387+
}
388+
if minPoolSize < 0 {
389+
return nil, errors.New("bad value (negtive) for minPoolSize: " + opt.value)
390+
}
391+
case "maxIdleTimeMS":
392+
maxIdleTimeMS, err = strconv.Atoi(opt.value)
393+
if err != nil {
394+
return nil, errors.New("bad value for maxIdleTimeMS: " + opt.value)
395+
}
396+
if maxIdleTimeMS < 0 {
397+
return nil, errors.New("bad value (negtive) for maxIdleTimeMS: " + opt.value)
398+
}
371399
case "connect":
372400
if opt.value == "direct" {
373401
direct = true
@@ -402,6 +430,8 @@ func ParseURL(url string) (*DialInfo, error) {
402430
TagSets: readPreferenceTagSets,
403431
},
404432
ReplicaSetName: setName,
433+
MinPoolSize: minPoolSize,
434+
MaxIdleTimeMS: maxIdleTimeMS,
405435
}
406436
return &info, nil
407437
}
@@ -475,6 +505,14 @@ type DialInfo struct {
475505
// cluster and establish connections with further servers too.
476506
Direct bool
477507

508+
// MinPoolSize defines The minimum number of connections in the connection pool.
509+
// Defaults to 0.
510+
MinPoolSize int
511+
512+
//The maximum number of milliseconds that a connection can remain idle in the pool
513+
// before being removed and closed.
514+
MaxIdleTimeMS int
515+
478516
// DialServer optionally specifies the dial function for establishing
479517
// connections with the MongoDB servers.
480518
DialServer func(addr *ServerAddr) (net.Conn, error)
@@ -554,6 +592,10 @@ func DialWithInfo(info *DialInfo) (*Session, error) {
554592
if info.PoolLimit > 0 {
555593
session.poolLimit = info.PoolLimit
556594
}
595+
596+
cluster.minPoolSize = info.MinPoolSize
597+
cluster.maxIdleTimeMS = info.MaxIdleTimeMS
598+
557599
cluster.Release()
558600

559601
// People get confused when we return a session that is not actually

session_test.go

+86
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@ import (
3030
"flag"
3131
"fmt"
3232
"math"
33+
"math/rand"
3334
"os"
3435
"runtime"
3536
"sort"
3637
"strconv"
3738
"strings"
39+
"sync"
3840
"testing"
3941
"time"
4042

@@ -166,6 +168,90 @@ func (s *S) TestURLInvalidReadPreference(c *C) {
166168
}
167169
}
168170

171+
func (s *S) TestMinPoolSize(c *C) {
172+
tests := []struct {
173+
url string
174+
size int
175+
fail bool
176+
}{
177+
{"localhost:40001?minPoolSize=0", 0, false},
178+
{"localhost:40001?minPoolSize=1", 1, false},
179+
{"localhost:40001?minPoolSize=-1", -1, true},
180+
{"localhost:40001?minPoolSize=-.", 0, true},
181+
}
182+
for _, test := range tests {
183+
info, err := mgo.ParseURL(test.url)
184+
if test.fail {
185+
c.Assert(err, NotNil)
186+
} else {
187+
c.Assert(err, IsNil)
188+
c.Assert(info.MinPoolSize, Equals, test.size)
189+
}
190+
}
191+
}
192+
193+
func (s *S) TestMaxIdleTimeMS(c *C) {
194+
tests := []struct {
195+
url string
196+
size int
197+
fail bool
198+
}{
199+
{"localhost:40001?maxIdleTimeMS=0", 0, false},
200+
{"localhost:40001?maxIdleTimeMS=1", 1, false},
201+
{"localhost:40001?maxIdleTimeMS=-1", -1, true},
202+
{"localhost:40001?maxIdleTimeMS=-.", 0, true},
203+
}
204+
for _, test := range tests {
205+
info, err := mgo.ParseURL(test.url)
206+
if test.fail {
207+
c.Assert(err, NotNil)
208+
} else {
209+
c.Assert(err, IsNil)
210+
c.Assert(info.MaxIdleTimeMS, Equals, test.size)
211+
}
212+
}
213+
}
214+
215+
func (s *S) TestPoolShrink(c *C) {
216+
if *fast {
217+
c.Skip("-fast")
218+
}
219+
oldSocket := mgo.GetStats().SocketsAlive
220+
221+
session, err := mgo.Dial("localhost:40001?minPoolSize=1&maxIdleTimeMS=1000")
222+
c.Assert(err, IsNil)
223+
defer session.Close()
224+
225+
parallel := 10
226+
res := make(chan error, parallel+1)
227+
wg := &sync.WaitGroup{}
228+
for i := 1; i < parallel; i++ {
229+
wg.Add(1)
230+
go func() {
231+
s := session.Copy()
232+
defer s.Close()
233+
result := struct{}{}
234+
err := s.Run("ping", &result)
235+
236+
//sleep random time to make the allocate and release in different sequence
237+
time.Sleep(time.Duration(rand.Intn(parallel)*100) * time.Millisecond)
238+
res <- err
239+
wg.Done()
240+
}()
241+
}
242+
wg.Wait()
243+
stats := mgo.GetStats()
244+
c.Logf("living socket: After queries: %d, before queries: %d", stats.SocketsAlive, oldSocket)
245+
246+
// give some time for shrink the pool, the tick is set to 1 minute
247+
c.Log("Sleeping... 1 minute to for pool shrinking")
248+
time.Sleep(60 * time.Second)
249+
250+
stats = mgo.GetStats()
251+
c.Logf("living socket: After shrinking: %d, at the beginning of the test: %d", stats.SocketsAlive, oldSocket)
252+
c.Assert(stats.SocketsAlive-oldSocket > 1, Equals, false)
253+
}
254+
169255
func (s *S) TestURLReadPreferenceTags(c *C) {
170256
type test struct {
171257
url string

socket.go

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type mongoSocket struct {
5454
dead error
5555
serverInfo *mongoServerInfo
5656
closeAfterIdle bool
57+
lastTimeUsed time.Time // for time based idle socket release
5758
sendMeta sync.Once
5859
}
5960

0 commit comments

Comments
 (0)