Skip to content

Commit f48d937

Browse files
Konstantinos Tsanaktsidisdomodwyer
Konstantinos Tsanaktsidis
authored andcommitted
Add signaling support for connection pool waiting (globalsign#115)
* Add signaling support for connection pool waiting The current behaviour when the poolLimit is reached and a new connection is required is to poll every 100ms to see if there is now headroom to make a new connection. This adds tremendous latency to the limit-hit-path. This commit changes the checkout behaviour to watch on a condition variable for connections to become available, and the checkin behaviour to signal this variable. This should allow waiters to use connections immediately after they become available. A new parameter is also added to DialInfo, PoolTimeout, which is the maximum time that clients will wait for connection headroom to become available. By default this is unlimited. * Add stats for connection pool timings This exposes four new counters * The number of times a socket was successfully obtained from the connection pool * The number of times the connection pool needed to be waited on * The total time that has been spent waiting for a conneciton to become available * The number of times socket acquisition failed due to a pool timeout * Gitignore .vscode directory I'm using vscode and accidently committed the .vscode directroy; .gitignore this footgun.
1 parent 00e7550 commit f48d937

File tree

6 files changed

+195
-26
lines changed

6 files changed

+195
-26
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
_harness
2-
2+
.vscode

cluster.go

+13-9
Original file line numberDiff line numberDiff line change
@@ -618,9 +618,17 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) {
618618
// true, it will attempt to return a socket to a slave server. If it is
619619
// false, the socket will necessarily be to a master server.
620620
func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) {
621+
return cluster.AcquireSocketWithPoolTimeout(mode, slaveOk, syncTimeout, socketTimeout, serverTags, poolLimit, 0)
622+
}
623+
624+
// AcquireSocketWithPoolTimeout returns a socket to a server in the cluster. If slaveOk is
625+
// true, it will attempt to return a socket to a slave server. If it is
626+
// false, the socket will necessarily be to a master server.
627+
func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
628+
mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int, poolTimeout time.Duration,
629+
) (s *mongoSocket, err error) {
621630
var started time.Time
622631
var syncCount uint
623-
warnedLimit := false
624632
for {
625633
cluster.RLock()
626634
for {
@@ -662,14 +670,10 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout
662670
continue
663671
}
664672

665-
s, abended, err := server.AcquireSocket(poolLimit, socketTimeout)
666-
if err == errPoolLimit {
667-
if !warnedLimit {
668-
warnedLimit = true
669-
log("WARNING: Per-server connection limit reached.")
670-
}
671-
time.Sleep(100 * time.Millisecond)
672-
continue
673+
s, abended, err := server.AcquireSocketWithBlocking(poolLimit, socketTimeout, poolTimeout)
674+
if err == errPoolTimeout {
675+
// No need to remove servers from the topology if acquiring a socket fails for this reason.
676+
return nil, err
673677
}
674678
if err != nil {
675679
cluster.removeServer(server)

cluster_test.go

+42
Original file line numberDiff line numberDiff line change
@@ -1583,6 +1583,9 @@ func (s *S) TestPoolLimitSimple(c *C) {
15831583
}
15841584
defer session.Close()
15851585

1586+
// So we can measure the stats for the blocking operation
1587+
mgo.ResetStats()
1588+
15861589
// Put one socket in use.
15871590
c.Assert(session.Ping(), IsNil)
15881591

@@ -1603,6 +1606,11 @@ func (s *S) TestPoolLimitSimple(c *C) {
16031606
session.Refresh()
16041607
delay := <-done
16051608
c.Assert(delay > 300*time.Millisecond, Equals, true, Commentf("Delay: %s", delay))
1609+
stats := mgo.GetStats()
1610+
c.Assert(stats.TimesSocketAcquired, Equals, 2)
1611+
c.Assert(stats.TimesWaitedForPool, Equals, 1)
1612+
c.Assert(stats.PoolTimeouts, Equals, 0)
1613+
c.Assert(stats.TotalPoolWaitTime > 300*time.Millisecond, Equals, true)
16061614
}
16071615
}
16081616

@@ -1649,6 +1657,40 @@ func (s *S) TestPoolLimitMany(c *C) {
16491657
c.Assert(delay < 6e9, Equals, true)
16501658
}
16511659

1660+
func (s *S) TestPoolLimitTimeout(c *C) {
1661+
if *fast {
1662+
c.Skip("-fast")
1663+
}
1664+
1665+
session, err := mgo.Dial("localhost:40011")
1666+
c.Assert(err, IsNil)
1667+
defer session.Close()
1668+
session.SetPoolTimeout(1 * time.Second)
1669+
session.SetPoolLimit(1)
1670+
1671+
mgo.ResetStats()
1672+
1673+
// Put one socket in use.
1674+
c.Assert(session.Ping(), IsNil)
1675+
1676+
// Now block trying to get another one due to the pool limit.
1677+
copy := session.Copy()
1678+
defer copy.Close()
1679+
started := time.Now()
1680+
err = copy.Ping()
1681+
delay := time.Since(started)
1682+
1683+
c.Assert(delay > 900*time.Millisecond, Equals, true, Commentf("Delay: %s", delay))
1684+
c.Assert(delay < 1100*time.Millisecond, Equals, true, Commentf("Delay: %s", delay))
1685+
c.Assert(strings.Contains(err.Error(), "could not acquire connection within pool timeout"), Equals, true, Commentf("Error: %s", err))
1686+
stats := mgo.GetStats()
1687+
c.Assert(stats.PoolTimeouts, Equals, 1)
1688+
c.Assert(stats.TimesSocketAcquired, Equals, 1)
1689+
c.Assert(stats.TimesWaitedForPool, Equals, 1)
1690+
c.Assert(stats.TotalPoolWaitTime > 900*time.Millisecond, Equals, true)
1691+
c.Assert(stats.TotalPoolWaitTime < 1100*time.Millisecond, Equals, true)
1692+
}
1693+
16521694
func (s *S) TestSetModeEventualIterBug(c *C) {
16531695
session1, err := mgo.Dial("localhost:40011")
16541696
c.Assert(err, IsNil)

server.go

+79-6
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type mongoServer struct {
5757
abended bool
5858
minPoolSize int
5959
maxIdleTimeMS int
60+
poolWaiter *sync.Cond
6061
}
6162

6263
type dialer struct {
@@ -78,18 +79,19 @@ type mongoServerInfo struct {
7879

7980
var defaultServerInfo mongoServerInfo
8081

81-
func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer {
82+
func newServer(addr string, tcpaddr *net.TCPAddr, syncChan chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer {
8283
server := &mongoServer{
8384
Addr: addr,
8485
ResolvedAddr: tcpaddr.String(),
8586
tcpaddr: tcpaddr,
86-
sync: sync,
87+
sync: syncChan,
8788
dial: dial,
8889
info: &defaultServerInfo,
8990
pingValue: time.Hour, // Push it back before an actual ping.
9091
minPoolSize: minPoolSize,
9192
maxIdleTimeMS: maxIdleTimeMS,
9293
}
94+
server.poolWaiter = sync.NewCond(server)
9395
go server.pinger(true)
9496
if maxIdleTimeMS != 0 {
9597
go server.poolShrinker()
@@ -98,6 +100,7 @@ func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, m
98100
}
99101

100102
var errPoolLimit = errors.New("per-server connection limit reached")
103+
var errPoolTimeout = errors.New("could not acquire connection within pool timeout")
101104
var errServerClosed = errors.New("server was closed")
102105

103106
// AcquireSocket returns a socket for communicating with the server.
@@ -109,18 +112,80 @@ var errServerClosed = errors.New("server was closed")
109112
// use in this server is greater than the provided limit, errPoolLimit is
110113
// returned.
111114
func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
115+
return server.acquireSocketInternal(poolLimit, timeout, false, 0*time.Millisecond)
116+
}
117+
118+
// AcquireSocketWithBlocking wraps AcquireSocket, but if a socket is not available, it will _not_
119+
// return errPoolLimit. Instead, it will block waiting for a socket to become available. If poolTimeout
120+
// should elapse before a socket is available, it will return errPoolTimeout.
121+
func (server *mongoServer) AcquireSocketWithBlocking(
122+
poolLimit int, socketTimeout time.Duration, poolTimeout time.Duration,
123+
) (socket *mongoSocket, abended bool, err error) {
124+
return server.acquireSocketInternal(poolLimit, socketTimeout, true, poolTimeout)
125+
}
126+
127+
func (server *mongoServer) acquireSocketInternal(
128+
poolLimit int, timeout time.Duration, shouldBlock bool, poolTimeout time.Duration,
129+
) (socket *mongoSocket, abended bool, err error) {
112130
for {
113131
server.Lock()
114132
abended = server.abended
115133
if server.closed {
116134
server.Unlock()
117135
return nil, abended, errServerClosed
118136
}
119-
n := len(server.unusedSockets)
120-
if poolLimit > 0 && len(server.liveSockets)-n >= poolLimit {
121-
server.Unlock()
122-
return nil, false, errPoolLimit
137+
if poolLimit > 0 {
138+
if shouldBlock {
139+
// Beautiful. Golang conditions don't have a WaitWithTimeout, so I've implemented the timeout
140+
// with a wait + broadcast. The broadcast will cause the loop here to re-check the timeout,
141+
// and fail if it is blown.
142+
// Yes, this is a spurious wakeup, but we can't do a directed signal without having one condition
143+
// variable per waiter, which would involve loop traversal in the RecycleSocket
144+
// method.
145+
// We also can't use the approach of turning a condition variable into a channel outlined in
146+
// https://github.com/golang/go/issues/16620, since the lock needs to be held in _this_ goroutine.
147+
waitDone := make(chan struct{})
148+
timeoutHit := false
149+
if poolTimeout > 0 {
150+
go func() {
151+
select {
152+
case <-waitDone:
153+
case <-time.After(poolTimeout):
154+
// timeoutHit is part of the wait condition, so needs to be changed under mutex.
155+
server.Lock()
156+
defer server.Unlock()
157+
timeoutHit = true
158+
server.poolWaiter.Broadcast()
159+
}
160+
}()
161+
}
162+
timeSpentWaiting := time.Duration(0)
163+
for len(server.liveSockets)-len(server.unusedSockets) >= poolLimit && !timeoutHit {
164+
// We only count time spent in Wait(), and not time evaluating the entire loop,
165+
// so that in the happy non-blocking path where the condition above evaluates true
166+
// first time, we record a nice round zero wait time.
167+
waitStart := time.Now()
168+
// unlocks server mutex, waits, and locks again. Thus, the condition
169+
// above is evaluated only when the lock is held.
170+
server.poolWaiter.Wait()
171+
timeSpentWaiting += time.Since(waitStart)
172+
}
173+
close(waitDone)
174+
if timeoutHit {
175+
server.Unlock()
176+
stats.noticePoolTimeout(timeSpentWaiting)
177+
return nil, false, errPoolTimeout
178+
}
179+
// Record that we fetched a connection of of a socket list and how long we spent waiting
180+
stats.noticeSocketAcquisition(timeSpentWaiting)
181+
} else {
182+
if len(server.liveSockets)-len(server.unusedSockets) >= poolLimit {
183+
server.Unlock()
184+
return nil, false, errPoolLimit
185+
}
186+
}
123187
}
188+
n := len(server.unusedSockets)
124189
if n > 0 {
125190
socket = server.unusedSockets[n-1]
126191
server.unusedSockets[n-1] = nil // Help GC.
@@ -231,6 +296,14 @@ func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
231296
socket.lastTimeUsed = time.Now()
232297
server.unusedSockets = append(server.unusedSockets, socket)
233298
}
299+
// If anybody is waiting for a connection, they should try now.
300+
// Note that this _has_ to be broadcast, not signal; the signature of AcquireSocket
301+
// and AcquireSocketWithBlocking allow the caller to specify the max number of connections,
302+
// rather than that being an intrinsic property of the connection pool (I assume to ensure
303+
// that there is always a connection available for replset topology discovery). Thus, once
304+
// a connection is returned to the pool, _every_ waiter needs to check if the connection count
305+
// is underneath their particular value for poolLimit.
306+
server.poolWaiter.Broadcast()
234307
server.Unlock()
235308
}
236309

session.go

+24-1
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ type Session struct {
9292
syncTimeout time.Duration
9393
sockTimeout time.Duration
9494
poolLimit int
95+
poolTimeout time.Duration
9596
consistency Mode
9697
creds []Credential
9798
dialCred *Credential
@@ -486,6 +487,11 @@ type DialInfo struct {
486487
// See Session.SetPoolLimit for details.
487488
PoolLimit int
488489

490+
// PoolTimeout defines max time to wait for a connection to become available
491+
// if the pool limit is reaqched. Defaults to zero, which means forever.
492+
// See Session.SetPoolTimeout for details
493+
PoolTimeout time.Duration
494+
489495
// The identifier of the client application which ran the operation.
490496
AppName string
491497

@@ -596,6 +602,10 @@ func DialWithInfo(info *DialInfo) (*Session, error) {
596602
cluster.minPoolSize = info.MinPoolSize
597603
cluster.maxIdleTimeMS = info.MaxIdleTimeMS
598604

605+
if info.PoolTimeout > 0 {
606+
session.poolTimeout = info.PoolTimeout
607+
}
608+
599609
cluster.Release()
600610

601611
// People get confused when we return a session that is not actually
@@ -711,6 +721,7 @@ func copySession(session *Session, keepCreds bool) (s *Session) {
711721
syncTimeout: session.syncTimeout,
712722
sockTimeout: session.sockTimeout,
713723
poolLimit: session.poolLimit,
724+
poolTimeout: session.poolTimeout,
714725
consistency: session.consistency,
715726
creds: creds,
716727
dialCred: session.dialCred,
@@ -2051,6 +2062,16 @@ func (s *Session) SetPoolLimit(limit int) {
20512062
s.m.Unlock()
20522063
}
20532064

2065+
// SetPoolTimeout sets the maxinum time connection attempts will wait to reuse
2066+
// an existing connection from the pool if the PoolLimit has been reached. If
2067+
// the value is exceeded, the attempt to use a session will fail with an error.
2068+
// The default value is zero, which means to wait forever with no timeout.
2069+
func (s *Session) SetPoolTimeout(timeout time.Duration) {
2070+
s.m.Lock()
2071+
s.poolTimeout = timeout
2072+
s.m.Unlock()
2073+
}
2074+
20542075
// SetBypassValidation sets whether the server should bypass the registered
20552076
// validation expressions executed when documents are inserted or modified,
20562077
// in the interest of preserving invariants in the collection being modified.
@@ -4908,7 +4929,9 @@ func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
49084929
}
49094930

49104931
// Still not good. We need a new socket.
4911-
sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)
4932+
sock, err := s.cluster().AcquireSocketWithPoolTimeout(
4933+
s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit, s.poolTimeout,
4934+
)
49124935
if err != nil {
49134936
return nil, err
49144937
}

stats.go

+36-9
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ package mgo
2828

2929
import (
3030
"sync"
31+
"time"
3132
)
3233

3334
var stats *Stats
@@ -77,15 +78,19 @@ func ResetStats() {
7778
//
7879
// TODO outdated fields ?
7980
type Stats struct {
80-
Clusters int
81-
MasterConns int
82-
SlaveConns int
83-
SentOps int
84-
ReceivedOps int
85-
ReceivedDocs int
86-
SocketsAlive int
87-
SocketsInUse int
88-
SocketRefs int
81+
Clusters int
82+
MasterConns int
83+
SlaveConns int
84+
SentOps int
85+
ReceivedOps int
86+
ReceivedDocs int
87+
SocketsAlive int
88+
SocketsInUse int
89+
SocketRefs int
90+
TimesSocketAcquired int
91+
TimesWaitedForPool int
92+
TotalPoolWaitTime time.Duration
93+
PoolTimeouts int
8994
}
9095

9196
func (stats *Stats) cluster(delta int) {
@@ -155,3 +160,25 @@ func (stats *Stats) socketRefs(delta int) {
155160
statsMutex.Unlock()
156161
}
157162
}
163+
164+
func (stats *Stats) noticeSocketAcquisition(waitTime time.Duration) {
165+
if stats != nil {
166+
statsMutex.Lock()
167+
stats.TimesSocketAcquired++
168+
stats.TotalPoolWaitTime += waitTime
169+
if waitTime > 0 {
170+
stats.TimesWaitedForPool++
171+
}
172+
statsMutex.Unlock()
173+
}
174+
}
175+
176+
func (stats *Stats) noticePoolTimeout(waitTime time.Duration) {
177+
if stats != nil {
178+
statsMutex.Lock()
179+
stats.TimesWaitedForPool++
180+
stats.PoolTimeouts++
181+
stats.TotalPoolWaitTime += waitTime
182+
statsMutex.Unlock()
183+
}
184+
}

0 commit comments

Comments
 (0)