Skip to content

Commit 49be92a

Browse files
committed
Use Echo method to send inactivity probe
This commit uses client's Echo method to send inactivity probe as it can be made as a time bound call so that rpc read lock can not be held indefinitely. This would make disconnect happens immediately when current ovsdb leader is accidentally gone away (this happens in a very rare scenario in which case sendEcho method returns with unexpected EOF error after 12 mins only) and reconnects with new ovsdb leader. Signed-off-by: Periyasamy Palanisamy <[email protected]>
1 parent 239822f commit 49be92a

File tree

1 file changed

+12
-51
lines changed

1 file changed

+12
-51
lines changed

client/client.go

Lines changed: 12 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,72 +1197,33 @@ func (o *ovsdbClient) handleClientErrors(stopCh <-chan struct{}) {
11971197
}
11981198
}
11991199

1200-
func (o *ovsdbClient) sendEcho(args []interface{}, reply *[]interface{}) *rpc2.Call {
1201-
o.rpcMutex.RLock()
1202-
defer o.rpcMutex.RUnlock()
1203-
if o.rpcClient == nil {
1204-
return nil
1205-
}
1206-
return o.rpcClient.Go("echo", args, reply, make(chan *rpc2.Call, 1))
1207-
}
1208-
12091200
func (o *ovsdbClient) handleInactivityProbes() {
12101201
defer o.handlerShutdown.Done()
1211-
echoReplied := make(chan string)
1212-
var lastEcho string
12131202
stopCh := o.stopCh
12141203
trafficSeen := o.trafficSeen
1204+
timer := time.NewTimer(o.options.inactivityTimeout)
12151205
for {
12161206
select {
12171207
case <-stopCh:
12181208
return
12191209
case <-trafficSeen:
12201210
// We got some traffic from the server, restart our timer
1221-
case ts := <-echoReplied:
1222-
// Got a response from the server, check it against lastEcho; if same clear lastEcho; if not same Disconnect()
1223-
if ts != lastEcho {
1224-
o.Disconnect()
1225-
return
1211+
if !timer.Stop() {
1212+
<-timer.C
12261213
}
1227-
lastEcho = ""
1228-
case <-time.After(o.options.inactivityTimeout):
1229-
// If there's a lastEcho already, then we didn't get a server reply, disconnect
1230-
if lastEcho != "" {
1231-
o.Disconnect()
1232-
return
1233-
}
1234-
// Otherwise send an echo
1235-
thisEcho := fmt.Sprintf("%d", time.Now().UnixMicro())
1236-
args := []interface{}{"libovsdb echo", thisEcho}
1237-
var reply []interface{}
1238-
// Can't use o.Echo() because it blocks; we need the Call object direct from o.rpcClient.Go()
1239-
call := o.sendEcho(args, &reply)
1240-
if call == nil {
1241-
o.Disconnect()
1242-
return
1243-
}
1244-
lastEcho = thisEcho
1214+
case <-timer.C:
1215+
// Otherwise send an echo in a goroutine so that transactions don't block
12451216
go func() {
1246-
// Wait for the echo reply
1247-
select {
1248-
case <-stopCh:
1249-
return
1250-
case <-call.Done:
1251-
if call.Error != nil {
1252-
// RPC timeout; disconnect
1253-
o.logger.V(3).Error(call.Error, "server echo reply error")
1254-
o.Disconnect()
1255-
} else if !reflect.DeepEqual(args, reply) {
1256-
o.logger.V(3).Info("warning: incorrect server echo reply",
1257-
"expected", args, "reply", reply)
1258-
o.Disconnect()
1259-
} else {
1260-
// Otherwise stuff thisEcho into the echoReplied channel
1261-
echoReplied <- thisEcho
1262-
}
1217+
ctx, cancel := context.WithTimeout(context.Background(), o.options.timeout)
1218+
err := o.Echo(ctx)
1219+
if err != nil {
1220+
o.logger.V(3).Error(err, "server echo reply error")
1221+
o.Disconnect()
12631222
}
1223+
cancel()
12641224
}()
12651225
}
1226+
timer.Reset(o.options.inactivityTimeout)
12661227
}
12671228
}
12681229

0 commit comments

Comments
 (0)