Skip to content

Commit 3ad9102

Browse files
committed
client: watch for leader loss, disconnect
This sets up a monitor on the _Server/Database meta-table, and disconnects if leadership is lost. It also adds some useful locking. Signed-off-by: Casey Callendrello <[email protected]>
1 parent aff8fbc commit 3ad9102

File tree

1 file changed

+77
-5
lines changed

1 file changed

+77
-5
lines changed

client/client.go

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type Client interface {
5656
MonitorAll(context.Context) (MonitorCookie, error)
5757
MonitorCancel(ctx context.Context, cookie MonitorCookie) error
5858
NewTableMonitor(m model.Model, fields ...interface{}) TableMonitor
59+
CurrentEndpoint() string
5960
API
6061
}
6162

@@ -68,9 +69,10 @@ type MonitorCookie struct {
6869

6970
// ovsdbClient is an OVSDB client
7071
type ovsdbClient struct {
71-
options *options
72-
rpcClient *rpc2.Client
73-
rpcMutex sync.RWMutex
72+
options *options
73+
rpcClient *rpc2.Client
74+
rpcMutex sync.RWMutex
75+
activeEndpoint string
7476

7577
// The name of the "primary" database - that is to say, the DB
7678
// that the user expects to interact with.
@@ -143,7 +145,16 @@ func newOVSDBClient(databaseModel *model.DBModel, opts ...Option) (*ovsdbClient,
143145
// The connection can be configured using one or more Option(s), like WithTLSConfig
144146
// If no WithEndpoint option is supplied, the default of unix:/var/run/openvswitch/ovsdb.sock is used
145147
func (o *ovsdbClient) Connect(ctx context.Context) error {
146-
return o.connect(ctx, false)
148+
if err := o.connect(ctx, false); err != nil {
149+
return err
150+
}
151+
152+
if o.options.leaderOnly {
153+
if err := o.watchForLeaderChange(); err != nil {
154+
return err
155+
}
156+
}
157+
return nil
147158
}
148159

149160
func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
@@ -165,6 +176,8 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
165176
fmt.Errorf("failed to connect to %s: %w", endpoint, err))
166177
continue
167178
} else {
179+
log.Printf("libovsdb: connected to %s", endpoint)
180+
o.activeEndpoint = endpoint
168181
connected = true
169182
break
170183
}
@@ -184,6 +197,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
184197

185198
// if we're reconnecting, re-start all the monitors
186199
if reconnect {
200+
log.Printf("libovsdb: reconnected - restarting monitors")
187201
for dbName, db := range o.databases {
188202
db.monitorsMutex.Lock()
189203
defer db.monitorsMutex.Unlock()
@@ -200,10 +214,12 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
200214
for _, db := range o.databases {
201215
go db.cache.Run(o.stopCh)
202216
}
217+
203218
return nil
204219
}
205220

206221
func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) error {
222+
log.Printf("libovsdb: trying to connect to DB %s", u)
207223
var dialer net.Dialer
208224
var err error
209225
var c net.Conn
@@ -421,6 +437,15 @@ func (o *ovsdbClient) Connected() bool {
421437
return o.rpcClient != nil
422438
}
423439

440+
func (o *ovsdbClient) CurrentEndpoint() string {
441+
o.rpcMutex.RLock()
442+
defer o.rpcMutex.RUnlock()
443+
if o.rpcClient == nil {
444+
return ""
445+
}
446+
return o.activeEndpoint
447+
}
448+
424449
// DisconnectNotify returns a channel which will notify the caller when the
425450
// server has disconnected
426451
func (o *ovsdbClient) DisconnectNotify() chan struct{} {
@@ -680,6 +705,48 @@ func (o *ovsdbClient) Echo(ctx context.Context) error {
680705
return nil
681706
}
682707

708+
// watchForLeaderChange will trigger a reconnect if the connected endpoint
709+
// ever loses leadership
710+
func (o *ovsdbClient) watchForLeaderChange() error {
711+
updates := make(chan model.Model)
712+
o.databases[serverDB].cache.AddEventHandler(&cache.EventHandlerFuncs{
713+
UpdateFunc: func(table string, _, new model.Model) {
714+
if table == "Database" {
715+
updates <- new
716+
}
717+
},
718+
})
719+
720+
err := o.monitor(context.Background(), newMonitorCookie(serverDB), false,
721+
TableMonitor{
722+
Table: "Database",
723+
},
724+
)
725+
if err != nil {
726+
return err
727+
}
728+
729+
go func() {
730+
for m := range updates {
731+
dbInfo, ok := m.(*serverdb.Database)
732+
if !ok {
733+
continue
734+
}
735+
736+
// Ignore the dbInfo for _Server
737+
if dbInfo.Name != o.primaryDBName {
738+
continue
739+
}
740+
741+
if dbInfo.Model == serverdb.DatabaseModelClustered && !dbInfo.Leader {
742+
log.Printf("libovsdb: endpoint %s lost leader, reconnecting", o.activeEndpoint)
743+
o.Disconnect()
744+
}
745+
}
746+
}()
747+
return nil
748+
}
749+
683750
func (o *ovsdbClient) handleDisconnectNotification() {
684751
<-o.rpcClient.DisconnectNotify()
685752
// close the stopCh, which will stop the cache event processor
@@ -691,8 +758,13 @@ func (o *ovsdbClient) handleDisconnectNotification() {
691758
connect := func() error {
692759
ctx, cancel := context.WithTimeout(context.Background(), o.options.timeout)
693760
defer cancel()
694-
return o.connect(ctx, true)
761+
err := o.connect(ctx, true)
762+
if err != nil {
763+
log.Printf("libovsdb: failed to reconnect: %s", err)
764+
}
765+
return err
695766
}
767+
log.Printf("libovsdb: connection to %s lost, reconnecting...", o.activeEndpoint)
696768
err := backoff.Retry(connect, o.options.backoff)
697769
if err != nil {
698770
// TODO: We should look at passing this back to the

0 commit comments

Comments
 (0)