@@ -24,6 +24,7 @@ import (
24
24
"github.com/ovn-org/libovsdb/model"
25
25
"github.com/ovn-org/libovsdb/ovsdb"
26
26
"github.com/ovn-org/libovsdb/ovsdb/serverdb"
27
+ "github.com/ovn-org/libovsdb/util"
27
28
)
28
29
29
30
// Constants defined for libovsdb
@@ -199,6 +200,65 @@ func (o *ovsdbClient) Connect(ctx context.Context) error {
199
200
return nil
200
201
}
201
202
203
+ func (db * database ) purge (name string ) error {
204
+ // If a table has any !Since monitors or has no conditions, purge it
205
+ // If a table only has Since monitors with conditions, purge only rows that match the conditions
206
+ type purge struct {
207
+ conditions []ovsdb.Condition
208
+ purgeAll bool
209
+ }
210
+
211
+ purges := make (map [string ]* purge )
212
+ for _ , monitor := range db .monitors {
213
+ for _ , tm := range monitor .Tables {
214
+ p , ok := purges [tm .Table ]
215
+ if ! ok {
216
+ p = & purge {}
217
+ purges [tm .Table ] = p
218
+ }
219
+ if monitor .Method == ovsdb .ConditionalMonitorSinceRPC {
220
+ model , err := db .model .NewModel (tm .Table )
221
+ if err != nil {
222
+ p .purgeAll = true
223
+ continue
224
+ }
225
+ info , err := db .model .NewModelInfo (model )
226
+ if err != nil {
227
+ p .purgeAll = true
228
+ continue
229
+ }
230
+ ovsdbCond , err := db .model .Mapper .NewCondition (info , tm .Condition .Field , tm .Condition .Function , tm .Condition .Value )
231
+ if err != nil {
232
+ p .purgeAll = true
233
+ continue
234
+ }
235
+ p .conditions = append (p .conditions , * ovsdbCond )
236
+ } else {
237
+ p .purgeAll = true
238
+ }
239
+ }
240
+ }
241
+ if len (purges ) == 0 {
242
+ db .cache .Purge (db .model )
243
+ return nil
244
+ }
245
+
246
+ var purgeErrors []error
247
+ for name , p := range purges {
248
+ if p .purgeAll {
249
+ if err := db .cache .PurgeTable (db .model , name ); err != nil {
250
+ purgeErrors = append (purgeErrors , err )
251
+ }
252
+ } else {
253
+ if err := db .cache .PurgeTableRows (db .model , name , p .conditions ); err != nil {
254
+ purgeErrors = append (purgeErrors , err )
255
+ }
256
+ }
257
+ }
258
+
259
+ return util .CombineErrors (purgeErrors , fmt .Sprintf ("failed to purge database %s" , name ))
260
+ }
261
+
202
262
func (o * ovsdbClient ) connect (ctx context.Context , reconnect bool ) error {
203
263
o .rpcMutex .Lock ()
204
264
defer o .rpcMutex .Unlock ()
@@ -226,15 +286,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
226
286
}
227
287
228
288
if ! connected {
229
- if len (connectErrors ) == 1 {
230
- return connectErrors [0 ]
231
- }
232
- var combined []string
233
- for _ , e := range connectErrors {
234
- combined = append (combined , e .Error ())
235
- }
236
-
237
- return fmt .Errorf ("unable to connect to any endpoints: %s" , strings .Join (combined , ". " ))
289
+ return util .CombineErrors (connectErrors , "unable to connect to any endpoints" )
238
290
}
239
291
240
292
// if we're reconnecting, re-start all the monitors
@@ -243,6 +295,10 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
243
295
for dbName , db := range o .databases {
244
296
db .monitorsMutex .Lock ()
245
297
defer db .monitorsMutex .Unlock ()
298
+
299
+ if err := db .purge (dbName ); err != nil {
300
+ o .logger .V (3 ).Error (err , "failed to purge" )
301
+ }
246
302
for id , request := range db .monitors {
247
303
err := o .monitor (ctx , MonitorCookie {DatabaseName : dbName , ID : id }, true , request )
248
304
if err != nil {
@@ -349,8 +405,6 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) error {
349
405
return err
350
406
}
351
407
db .api = newAPI (db .cache , o .logger )
352
- } else {
353
- db .cache .Purge (db .model )
354
408
}
355
409
db .cacheMutex .Unlock ()
356
410
}
@@ -832,12 +886,7 @@ func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconne
832
886
833
887
var args []interface {}
834
888
if monitor .Method == ovsdb .ConditionalMonitorSinceRPC {
835
- // FIXME: We should pass the monitor.LastTransactionID here
836
- // But that would require delaying clearing the cache until
837
- // after the monitors have been re-established - the logic
838
- // would also need to be different for monitor and monitor_cond
839
- // as we must always clear the cache in that instance
840
- args = ovsdb .NewMonitorCondSinceArgs (dbName , cookie , requests , emptyUUID )
889
+ args = ovsdb .NewMonitorCondSinceArgs (dbName , cookie , requests , monitor .LastTransactionID )
841
890
} else {
842
891
args = ovsdb .NewMonitorArgs (dbName , cookie , requests )
843
892
}
0 commit comments