@@ -20,6 +20,9 @@ import (
20
20
"github.com/ovn-org/libovsdb/mapper"
21
21
"github.com/ovn-org/libovsdb/model"
22
22
"github.com/ovn-org/libovsdb/ovsdb"
23
+ "go.opentelemetry.io/otel"
24
+ "go.opentelemetry.io/otel/attribute"
25
+ "go.opentelemetry.io/otel/trace"
23
26
)
24
27
25
28
// Constants defined for libovsdb
@@ -32,6 +35,9 @@ const (
32
35
// ErrNotConnected is an error returned when the client is not connected
33
36
var ErrNotConnected = errors .New ("not connected" )
34
37
38
+ // tracer is the tracer for opentelemetry
39
+ var tracer = otel .Tracer ("libovsdb.ovn.org/client" )
40
+
35
41
// Client represents an OVSDB Client Connection
36
42
// It provides all the necessary functionality to Connect to a server,
37
43
// perform transactions, and build your own replica of the database with
@@ -46,12 +52,12 @@ type Client interface {
46
52
SetOption (Option ) error
47
53
Connected () bool
48
54
DisconnectNotify () chan struct {}
49
- Echo () error
50
- Transact (... ovsdb.Operation ) ([]ovsdb.OperationResult , error )
51
- Monitor (... TableMonitor ) (string , error )
52
- MonitorAll () (string , error )
53
- MonitorCancel (id string ) error
54
- NewTableMonitor (m model.Model , fields ... interface {}) TableMonitor
55
+ Echo (context. Context ) error
56
+ Transact (context. Context , ... ovsdb.Operation ) ([]ovsdb.OperationResult , error )
57
+ Monitor (context. Context , ... TableMonitor ) (string , error )
58
+ MonitorAll (context. Context ) (string , error )
59
+ MonitorCancel (context. Context , string ) error
60
+ NewTableMonitor (model.Model , ... interface {}) TableMonitor
55
61
API
56
62
}
57
63
@@ -149,7 +155,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
149
155
return err
150
156
}
151
157
152
- dbs , err := o .listDbs ()
158
+ dbs , err := o .listDbs (ctx )
153
159
if err != nil {
154
160
o .rpcClient .Close ()
155
161
return err
@@ -167,7 +173,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
167
173
return fmt .Errorf ("target database not found" )
168
174
}
169
175
170
- schema , err := o .getSchema (o .dbModel .Name ())
176
+ schema , err := o .getSchema (ctx , o .dbModel .Name ())
171
177
errors := o .dbModel .Validate (schema )
172
178
if len (errors ) > 0 {
173
179
var combined []string
@@ -205,7 +211,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
205
211
o .monitorsMutex .Lock ()
206
212
defer o .monitorsMutex .Unlock ()
207
213
for id , request := range o .monitors {
208
- err = o .monitor (id , reconnect , request ... )
214
+ err = o .monitor (ctx , id , reconnect , request ... )
209
215
if err != nil {
210
216
o .rpcClient .Close ()
211
217
return err
@@ -312,7 +318,9 @@ func (o *ovsdbClient) update(args []json.RawMessage, reply *[]interface{}) error
312
318
// getSchema returns the schema in use for the provided database name
313
319
// RFC 7047 : get_schema
314
320
// Should only be called when mutex is held
315
- func (o * ovsdbClient ) getSchema (dbName string ) (* ovsdb.DatabaseSchema , error ) {
321
+ func (o * ovsdbClient ) getSchema (ctx context.Context , dbName string ) (* ovsdb.DatabaseSchema , error ) {
322
+ _ , span := tracer .Start (ctx , "get_schema" )
323
+ defer span .End ()
316
324
args := ovsdb .NewGetSchemaArgs (dbName )
317
325
var reply ovsdb.DatabaseSchema
318
326
err := o .rpcClient .Call ("get_schema" , args , & reply )
@@ -328,7 +336,9 @@ func (o *ovsdbClient) getSchema(dbName string) (*ovsdb.DatabaseSchema, error) {
328
336
// listDbs returns the list of databases on the server
329
337
// RFC 7047 : list_dbs
330
338
// Should only be called when mutex is held
331
- func (o * ovsdbClient ) listDbs () ([]string , error ) {
339
+ func (o * ovsdbClient ) listDbs (ctx context.Context ) ([]string , error ) {
340
+ _ , span := tracer .Start (ctx , "list_dbs" )
341
+ defer span .End ()
332
342
var dbs []string
333
343
err := o .rpcClient .Call ("list_dbs" , nil , & dbs )
334
344
if err != nil {
@@ -342,13 +352,17 @@ func (o *ovsdbClient) listDbs() ([]string, error) {
342
352
343
353
// Transact performs the provided Operations on the database
344
354
// RFC 7047 : transact
345
- func (o * ovsdbClient ) Transact (operation ... ovsdb.Operation ) ([]ovsdb.OperationResult , error ) {
355
+ func (o * ovsdbClient ) Transact (ctx context.Context , operation ... ovsdb.Operation ) ([]ovsdb.OperationResult , error ) {
356
+ _ , span := tracer .Start (ctx , "transact" )
357
+ defer span .End ()
358
+ span .AddEvent ("validating operations" )
346
359
var reply []ovsdb.OperationResult
347
360
if ok := o .Schema ().ValidateOperations (operation ... ); ! ok {
348
361
return nil , fmt .Errorf ("validation failed for the operation" )
349
362
}
350
363
args := ovsdb .NewTransactArgs (o .schema .Name , operation ... )
351
364
365
+ span .AddEvent ("sending request to server" )
352
366
o .rpcMutex .Lock ()
353
367
if o .rpcClient == nil {
354
368
o .rpcMutex .Unlock ()
@@ -366,17 +380,21 @@ func (o *ovsdbClient) Transact(operation ...ovsdb.Operation) ([]ovsdb.OperationR
366
380
}
367
381
368
382
// MonitorAll is a convenience method to monitor every table/column
369
- func (o * ovsdbClient ) MonitorAll () (string , error ) {
383
+ func (o * ovsdbClient ) MonitorAll (ctx context.Context ) (string , error ) {
384
+ ctx , span := tracer .Start (ctx , "monitor_all" )
385
+ defer span .End ()
370
386
var options []TableMonitor
371
387
for name := range o .dbModel .Types () {
372
388
options = append (options , TableMonitor {Table : name })
373
389
}
374
- return o .Monitor (options ... )
390
+ return o .Monitor (ctx , options ... )
375
391
}
376
392
377
393
// MonitorCancel will request cancel a previously issued monitor request
378
394
// RFC 7047 : monitor_cancel
379
- func (o * ovsdbClient ) MonitorCancel (id string ) error {
395
+ func (o * ovsdbClient ) MonitorCancel (ctx context.Context , id string ) error {
396
+ _ , span := tracer .Start (ctx , "monitor_cancel" )
397
+ defer span .End ()
380
398
var reply ovsdb.OperationResult
381
399
args := ovsdb .NewMonitorCancelArgs (id )
382
400
o .rpcMutex .Lock ()
@@ -428,12 +446,16 @@ func (o *ovsdbClient) NewTableMonitor(m model.Model, fields ...interface{}) Tabl
428
446
// and populate the cache with them. Subsequent updates will be processed
429
447
// by the Update Notifications
430
448
// RFC 7047 : monitor
431
- func (o * ovsdbClient ) Monitor (options ... TableMonitor ) (string , error ) {
449
+ func (o * ovsdbClient ) Monitor (ctx context.Context , options ... TableMonitor ) (string , error ) {
450
+ ctx , span := tracer .Start (ctx , "monitor" )
451
+ defer span .End ()
432
452
id := uuid .NewString ()
433
- return id , o .monitor (id , false , options ... )
453
+ return id , o .monitor (ctx , id , false , options ... )
434
454
}
435
455
436
- func (o * ovsdbClient ) monitor (id string , reconnect bool , options ... TableMonitor ) error {
456
+ func (o * ovsdbClient ) monitor (ctx context.Context , id string , reconnect bool , options ... TableMonitor ) error {
457
+ ctx , span := tracer .Start (ctx , "monitor.internal" , trace .WithAttributes (attribute .String ("id" , id )))
458
+ defer span .End ()
437
459
if len (options ) == 0 {
438
460
return fmt .Errorf ("no monitor options provided" )
439
461
}
@@ -475,12 +497,14 @@ func (o *ovsdbClient) monitor(id string, reconnect bool, options ...TableMonitor
475
497
defer o .monitorsMutex .Unlock ()
476
498
o .monitors [id ] = options
477
499
}
478
- o .cache .Populate (reply )
500
+ o .cache .Populate (ctx , reply )
479
501
return nil
480
502
}
481
503
482
504
// Echo tests the liveness of the OVSDB connetion
483
- func (o * ovsdbClient ) Echo () error {
505
+ func (o * ovsdbClient ) Echo (ctx context.Context ) error {
506
+ _ , span := tracer .Start (ctx , "echo" )
507
+ defer span .End ()
484
508
args := ovsdb .NewEchoArgs ()
485
509
var reply []interface {}
486
510
o .rpcMutex .RLock ()
0 commit comments