Skip to content

Commit 330edf0

Browse files
committed
client: honor last transaction IDs
Signed-off-by: Dan Williams <[email protected]>
1 parent a619f0f commit 330edf0

File tree

3 files changed

+113
-19
lines changed

3 files changed

+113
-19
lines changed

cache/cache.go

+37
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/ovn-org/libovsdb/mapper"
1818
"github.com/ovn-org/libovsdb/model"
1919
"github.com/ovn-org/libovsdb/ovsdb"
20+
"github.com/ovn-org/libovsdb/util"
2021
)
2122

2223
const (
@@ -718,6 +719,42 @@ func (t *TableCache) Purge(dbModel model.DatabaseModel) {
718719
}
719720
}
720721

722+
// PurgeTable drops all data in the given table's cache and reinitializes it using the
723+
// provided database model
724+
func (t *TableCache) PurgeTable(dbModel model.DatabaseModel, name string) error {
725+
return t.PurgeTableRows(dbModel, name, nil)
726+
}
727+
728+
// PurgeTableRows drops all rows in the given table's cache that match the given conditions
729+
func (t *TableCache) PurgeTableRows(dbModel model.DatabaseModel, name string, conditions []ovsdb.Condition) error {
730+
t.mutex.Lock()
731+
defer t.mutex.Unlock()
732+
t.dbModel = dbModel
733+
tableTypes := t.dbModel.Types()
734+
dataType, ok := tableTypes[name]
735+
if !ok {
736+
return fmt.Errorf("table %s not found", name)
737+
}
738+
if len(conditions) == 0 {
739+
t.cache[name] = newRowCache(name, t.dbModel, dataType)
740+
return nil
741+
}
742+
743+
r := t.cache[name]
744+
rows, err := r.RowsByCondition(conditions)
745+
if err != nil {
746+
return err
747+
}
748+
delErrors := []error{}
749+
for uuid := range rows {
750+
if err := r.Delete(uuid); err != nil {
751+
delErrors = append(delErrors, fmt.Errorf("failed to delete %s: %w", uuid, err))
752+
}
753+
}
754+
755+
return util.CombineErrors(delErrors, "failed to delete rows")
756+
}
757+
721758
// AddEventHandler registers the supplied EventHandler to receive cache events
722759
func (t *TableCache) AddEventHandler(handler EventHandler) {
723760
t.eventProcessor.AddEventHandler(handler)

client/client.go

+56-19
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/ovn-org/libovsdb/model"
2525
"github.com/ovn-org/libovsdb/ovsdb"
2626
"github.com/ovn-org/libovsdb/ovsdb/serverdb"
27+
"github.com/ovn-org/libovsdb/util"
2728
)
2829

2930
// Constants defined for libovsdb
@@ -260,15 +261,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
260261
}
261262

262263
if !connected {
263-
if len(connectErrors) == 1 {
264-
return connectErrors[0]
265-
}
266-
var combined []string
267-
for _, e := range connectErrors {
268-
combined = append(combined, e.Error())
269-
}
270-
271-
return fmt.Errorf("unable to connect to any endpoints: %s", strings.Join(combined, ". "))
264+
return util.CombineErrors(connectErrors, "unable to connect to any endpoints")
272265
}
273266

274267
// if we're reconnecting, re-start all the monitors
@@ -371,8 +364,6 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) (string, erro
371364
return "", err
372365
}
373366
db.api = newAPI(db.cache, o.logger)
374-
} else {
375-
db.cache.Purge(db.model)
376367
}
377368
db.cacheMutex.Unlock()
378369
}
@@ -809,6 +800,51 @@ func (o *ovsdbClient) Monitor(ctx context.Context, monitor *Monitor) (MonitorCoo
809800
return cookie, o.monitor(ctx, cookie, false, monitor)
810801
}
811802

803+
func (db *database) getMonitorTableConditions(tm TableMonitor) (*ovsdb.Condition, error) {
804+
model, err := db.model.NewModel(tm.Table)
805+
if err != nil {
806+
return nil, err
807+
}
808+
info, err := db.model.NewModelInfo(model)
809+
if err != nil {
810+
return nil, err
811+
}
812+
return db.model.Mapper.NewCondition(info, tm.Condition.Field, tm.Condition.Function, tm.Condition.Value)
813+
}
814+
815+
// purge removes all rows from the row cache that match the monitor
816+
func (o *ovsdbClient) purge(db *database, monitor *Monitor) {
817+
if len(monitor.Tables) == 0 {
818+
db.cache.Purge(db.model)
819+
return
820+
}
821+
822+
var err error
823+
for _, tm := range monitor.Tables {
824+
if monitor.Method == ovsdb.ConditionalMonitorSinceRPC {
825+
var cond *ovsdb.Condition
826+
cond, err = db.getMonitorTableConditions(tm)
827+
if err != nil {
828+
break
829+
}
830+
err = db.cache.PurgeTableRows(db.model, tm.Table, []ovsdb.Condition{*cond})
831+
if err != nil {
832+
break
833+
}
834+
} else {
835+
err = db.cache.PurgeTable(db.model, tm.Table)
836+
if err != nil {
837+
break
838+
}
839+
}
840+
}
841+
842+
if err != nil {
843+
o.logger.V(3).Error(err, "failed to purge database")
844+
db.cache.Purge(db.model)
845+
}
846+
}
847+
812848
//gocyclo:ignore
813849
// monitor must only be called with a lock on monitorsMutex
814850
func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconnecting bool, monitor *Monitor) error {
@@ -859,12 +895,7 @@ func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconne
859895

860896
var args []interface{}
861897
if monitor.Method == ovsdb.ConditionalMonitorSinceRPC {
862-
// FIXME: We should pass the monitor.LastTransactionID here
863-
// But that would require delaying clearing the cache until
864-
// after the monitors have been re-established - the logic
865-
// would also need to be different for monitor and monitor_cond
866-
// as we must always clear the cache in that instance
867-
args = ovsdb.NewMonitorCondSinceArgs(dbName, cookie, requests, emptyUUID)
898+
args = ovsdb.NewMonitorCondSinceArgs(dbName, cookie, requests, monitor.LastTransactionID)
868899
} else {
869900
args = ovsdb.NewMonitorArgs(dbName, cookie, requests)
870901
}
@@ -873,18 +904,24 @@ func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconne
873904

874905
switch monitor.Method {
875906
case ovsdb.MonitorRPC:
907+
o.purge(db, monitor)
876908
var reply ovsdb.TableUpdates
877909
err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply)
878910
tableUpdates = reply
879911
case ovsdb.ConditionalMonitorRPC:
912+
o.purge(db, monitor)
880913
var reply ovsdb.TableUpdates2
881914
err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply)
882915
tableUpdates = reply
883916
case ovsdb.ConditionalMonitorSinceRPC:
884917
var reply ovsdb.MonitorCondSinceReply
885918
err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply)
886-
if err == nil && reply.Found {
887-
monitor.LastTransactionID = reply.LastTransactionID
919+
if err == nil {
920+
if reply.Found {
921+
monitor.LastTransactionID = reply.LastTransactionID
922+
} else {
923+
o.purge(db, monitor)
924+
}
888925
}
889926
tableUpdates = reply.Updates
890927
default:

util/errors.go

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package util
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
)
7+
8+
func CombineErrors(errors []error, msg string) error {
9+
if len(errors) == 0 {
10+
return nil
11+
} else if len(errors) == 1 {
12+
return errors[0]
13+
}
14+
15+
var combined []string
16+
for _, e := range errors {
17+
combined = append(combined, e.Error())
18+
}
19+
return fmt.Errorf("%s: %s", msg, strings.Join(combined, ". "))
20+
}

0 commit comments

Comments
 (0)