Skip to content

Commit d86efbe

Browse files
committed
Add opentelemetry support
1. Added context.Context to most API methods for span propagation 2. Spans are added to the parent (from context) when an API is called 3. Simple cache metrics can be registered with cache.RegisterMetrics() Signed-off-by: Dave Tucker <[email protected]>
1 parent ab07436 commit d86efbe

File tree

17 files changed

+624
-107
lines changed

17 files changed

+624
-107
lines changed

.github/workflows/ci.yml

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ jobs:
8585
- 2.13.0
8686

8787
runs-on: ubuntu-latest
88-
88+
8989
steps:
9090
- name: Set up Go 1.16
9191
uses: actions/setup-go@v1
@@ -99,4 +99,50 @@ jobs:
9999
- name: Integration Test
100100
run: make integration-test
101101
env:
102-
OVS_IMAGE_TAG: ${{ matrix.ovs_version }}
102+
OVS_IMAGE_TAG: ${{ matrix.ovs_version }}
103+
104+
images:
105+
needs: build
106+
name: Build Image
107+
runs-on: ubuntu-latest
108+
strategy:
109+
matrix:
110+
cmd: [modelgen, print_schema, stress]
111+
112+
steps:
113+
- name: Check Out Repo
114+
uses: actions/checkout@v2
115+
116+
- name: Set up Docker Buildx
117+
id: buildx
118+
uses: docker/setup-buildx-action@v1
119+
120+
- name: Cache Docker layers
121+
uses: actions/cache@v2
122+
with:
123+
path: /tmp/.buildx-cache
124+
key: ${{ runner.os }}-buildx-${{ github.sha }}
125+
restore-keys: |
126+
${{ runner.os }}-buildx-
127+
128+
- name: Login to Docker Hub
129+
if: ${{ contains(github.ref, 'refs/head/main') }}
130+
uses: docker/login-action@v1
131+
with:
132+
username: ${{ secrets.DOCKER_HUB_USERNAME }}
133+
password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }}
134+
135+
- name: Build and push
136+
id: docker_build
137+
uses: docker/build-push-action@v2
138+
with:
139+
context: .
140+
target: ${{ matrix.cmd }}
141+
builder: ${{ steps.buildx.outputs.name }}
142+
push: ${{ contains(github.ref, 'refs/head/main') }}
143+
tags: libovsdb/${{ matrix.cmd }}:latest
144+
cache-from: type=local,src=/tmp/.buildx-cache
145+
cache-to: type=local,dest=/tmp/.buildx-cache
146+
147+
- name: Image digest
148+
run: echo ${{ steps.docker_build.outputs.digest }}

Dockerfile

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
FROM golang:1.16-alpine as base
2+
COPY . /src
3+
WORKDIR /src
4+
5+
FROM base as stress
6+
RUN go install ./cmd/stress
7+
8+
FROM base as print_schema
9+
RUN go install ./cmd/print_schema
10+
11+
FROM base as modelgen
12+
RUN go install ./cmd/modelgen

cache/cache.go

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cache
22

33
import (
44
"bytes"
5+
"context"
56
"crypto/sha256"
67
"encoding/gob"
78
"encoding/hex"
@@ -15,8 +16,13 @@ import (
1516
"github.com/ovn-org/libovsdb/mapper"
1617
"github.com/ovn-org/libovsdb/model"
1718
"github.com/ovn-org/libovsdb/ovsdb"
19+
"go.opentelemetry.io/otel"
20+
"go.opentelemetry.io/otel/metric"
21+
"go.opentelemetry.io/otel/metric/global"
1822
)
1923

24+
var tracer = otel.Tracer("libovsdb.ovn.org/cache")
25+
2026
const (
2127
updateEvent = "update"
2228
addEvent = "add"
@@ -346,14 +352,16 @@ func NewTableCache(schema *ovsdb.DatabaseSchema, dbModel *model.DBModel, data Da
346352
}
347353
}
348354
}
349-
return &TableCache{
355+
356+
tc := &TableCache{
350357
cache: cache,
351358
schema: schema,
352359
eventProcessor: eventProcessor,
353360
mapper: mapper.NewMapper(schema),
354361
dbModel: dbModel,
355362
mutex: sync.RWMutex{},
356-
}, nil
363+
}
364+
return tc, nil
357365
}
358366

359367
// Mapper returns the mapper
@@ -389,11 +397,11 @@ func (t *TableCache) Tables() []string {
389397

390398
// Update implements the update method of the NotificationHandler interface
391399
// this populates the cache with new updates
392-
func (t *TableCache) Update(context interface{}, tableUpdates ovsdb.TableUpdates) {
400+
func (t *TableCache) Update(monitorID interface{}, tableUpdates ovsdb.TableUpdates) {
393401
if len(tableUpdates) == 0 {
394402
return
395403
}
396-
t.Populate(tableUpdates)
404+
t.Populate(context.TODO(), tableUpdates)
397405
}
398406

399407
// Locked implements the locked method of the NotificationHandler interface
@@ -412,8 +420,36 @@ func (t *TableCache) Echo([]interface{}) {
412420
func (t *TableCache) Disconnected() {
413421
}
414422

423+
// RegisterMetrics registers a metric for all known tables
424+
func (t *TableCache) RegisterMetrics() error {
425+
for _, tt := range t.Tables() {
426+
if err := t.registerMetric(tt); err != nil {
427+
return err
428+
}
429+
}
430+
return nil
431+
}
432+
433+
func (t *TableCache) registerMetric(table string) error {
434+
meter := global.Meter("libovsdb.ovn.org/cache")
435+
if _, ok := t.cache[table]; !ok {
436+
return fmt.Errorf("table not found")
437+
}
438+
_, err := meter.NewInt64ValueObserver(
439+
fmt.Sprintf("libovsdb.cache.%s.size", strings.ToLower(table)),
440+
func(ctx context.Context, result metric.Int64ObserverResult) {
441+
value := t.Table(table).Len()
442+
result.Observe(int64(value))
443+
},
444+
metric.WithDescription(fmt.Sprintf("the size of the %s table in the cache", strings.ToLower(table))),
445+
)
446+
return err
447+
}
448+
415449
// Populate adds data to the cache and places an event on the channel
416-
func (t *TableCache) Populate(tableUpdates ovsdb.TableUpdates) {
450+
func (t *TableCache) Populate(ctx context.Context, tableUpdates ovsdb.TableUpdates) {
451+
_, span := tracer.Start(ctx, "cache_populate")
452+
defer span.End()
417453
t.mutex.Lock()
418454
defer t.mutex.Unlock()
419455

cache/cache_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cache
22

33
import (
4+
"context"
45
"testing"
56

67
"encoding/json"
@@ -721,7 +722,7 @@ func TestTableCache_populate(t *testing.T) {
721722
},
722723
},
723724
}
724-
tc.Populate(updates)
725+
tc.Populate(context.Background(), updates)
725726

726727
got := tc.Table("Open_vSwitch").Row("test")
727728
assert.Equal(t, testRowModel, got)
@@ -733,7 +734,7 @@ func TestTableCache_populate(t *testing.T) {
733734
Old: &testRow,
734735
New: &updatedRow,
735736
}
736-
tc.Populate(updates)
737+
tc.Populate(context.Background(), updates)
737738

738739
got = tc.cache["Open_vSwitch"].cache["test"]
739740
assert.Equal(t, updatedRowModel, got)
@@ -744,7 +745,7 @@ func TestTableCache_populate(t *testing.T) {
744745
New: nil,
745746
}
746747

747-
tc.Populate(updates)
748+
tc.Populate(context.Background(), updates)
748749

749750
_, ok := tc.cache["Open_vSwitch"].cache["test"]
750751
assert.False(t, ok)

client/client.go

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ import (
2020
"github.com/ovn-org/libovsdb/mapper"
2121
"github.com/ovn-org/libovsdb/model"
2222
"github.com/ovn-org/libovsdb/ovsdb"
23+
"go.opentelemetry.io/otel"
24+
"go.opentelemetry.io/otel/attribute"
25+
"go.opentelemetry.io/otel/trace"
2326
)
2427

2528
// Constants defined for libovsdb
@@ -32,6 +35,9 @@ const (
3235
// ErrNotConnected is an error returned when the client is not connected
3336
var ErrNotConnected = errors.New("not connected")
3437

38+
// tracer is the tracer for opentelemetry
39+
var tracer = otel.Tracer("libovsdb.ovn.org/client")
40+
3541
// Client represents an OVSDB Client Connection
3642
// It provides all the necessary functionality to Connect to a server,
3743
// perform transactions, and build your own replica of the database with
@@ -46,12 +52,12 @@ type Client interface {
4652
SetOption(Option) error
4753
Connected() bool
4854
DisconnectNotify() chan struct{}
49-
Echo() error
50-
Transact(...ovsdb.Operation) ([]ovsdb.OperationResult, error)
51-
Monitor(...TableMonitor) (string, error)
52-
MonitorAll() (string, error)
53-
MonitorCancel(id string) error
54-
NewTableMonitor(m model.Model, fields ...interface{}) TableMonitor
55+
Echo(context.Context) error
56+
Transact(context.Context, ...ovsdb.Operation) ([]ovsdb.OperationResult, error)
57+
Monitor(context.Context, ...TableMonitor) (string, error)
58+
MonitorAll(context.Context) (string, error)
59+
MonitorCancel(context.Context, string) error
60+
NewTableMonitor(model.Model, ...interface{}) TableMonitor
5561
API
5662
}
5763

@@ -149,7 +155,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
149155
return err
150156
}
151157

152-
dbs, err := o.listDbs()
158+
dbs, err := o.listDbs(ctx)
153159
if err != nil {
154160
o.rpcClient.Close()
155161
return err
@@ -167,7 +173,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
167173
return fmt.Errorf("target database not found")
168174
}
169175

170-
schema, err := o.getSchema(o.dbModel.Name())
176+
schema, err := o.getSchema(ctx, o.dbModel.Name())
171177
errors := o.dbModel.Validate(schema)
172178
if len(errors) > 0 {
173179
var combined []string
@@ -205,7 +211,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
205211
o.monitorsMutex.Lock()
206212
defer o.monitorsMutex.Unlock()
207213
for id, request := range o.monitors {
208-
err = o.monitor(id, reconnect, request...)
214+
err = o.monitor(ctx, id, reconnect, request...)
209215
if err != nil {
210216
o.rpcClient.Close()
211217
return err
@@ -312,7 +318,9 @@ func (o *ovsdbClient) update(args []json.RawMessage, reply *[]interface{}) error
312318
// getSchema returns the schema in use for the provided database name
313319
// RFC 7047 : get_schema
314320
// Should only be called when mutex is held
315-
func (o *ovsdbClient) getSchema(dbName string) (*ovsdb.DatabaseSchema, error) {
321+
func (o *ovsdbClient) getSchema(ctx context.Context, dbName string) (*ovsdb.DatabaseSchema, error) {
322+
_, span := tracer.Start(ctx, "get_schema")
323+
defer span.End()
316324
args := ovsdb.NewGetSchemaArgs(dbName)
317325
var reply ovsdb.DatabaseSchema
318326
err := o.rpcClient.Call("get_schema", args, &reply)
@@ -328,7 +336,9 @@ func (o *ovsdbClient) getSchema(dbName string) (*ovsdb.DatabaseSchema, error) {
328336
// listDbs returns the list of databases on the server
329337
// RFC 7047 : list_dbs
330338
// Should only be called when mutex is held
331-
func (o *ovsdbClient) listDbs() ([]string, error) {
339+
func (o *ovsdbClient) listDbs(ctx context.Context) ([]string, error) {
340+
_, span := tracer.Start(ctx, "list_dbs")
341+
defer span.End()
332342
var dbs []string
333343
err := o.rpcClient.Call("list_dbs", nil, &dbs)
334344
if err != nil {
@@ -342,13 +352,17 @@ func (o *ovsdbClient) listDbs() ([]string, error) {
342352

343353
// Transact performs the provided Operations on the database
344354
// RFC 7047 : transact
345-
func (o *ovsdbClient) Transact(operation ...ovsdb.Operation) ([]ovsdb.OperationResult, error) {
355+
func (o *ovsdbClient) Transact(ctx context.Context, operation ...ovsdb.Operation) ([]ovsdb.OperationResult, error) {
356+
_, span := tracer.Start(ctx, "transact")
357+
defer span.End()
358+
span.AddEvent("validating operations")
346359
var reply []ovsdb.OperationResult
347360
if ok := o.Schema().ValidateOperations(operation...); !ok {
348361
return nil, fmt.Errorf("validation failed for the operation")
349362
}
350363
args := ovsdb.NewTransactArgs(o.schema.Name, operation...)
351364

365+
span.AddEvent("sending request to server")
352366
o.rpcMutex.Lock()
353367
if o.rpcClient == nil {
354368
o.rpcMutex.Unlock()
@@ -366,17 +380,21 @@ func (o *ovsdbClient) Transact(operation ...ovsdb.Operation) ([]ovsdb.OperationR
366380
}
367381

368382
// MonitorAll is a convenience method to monitor every table/column
369-
func (o *ovsdbClient) MonitorAll() (string, error) {
383+
func (o *ovsdbClient) MonitorAll(ctx context.Context) (string, error) {
384+
ctx, span := tracer.Start(ctx, "monitor_all")
385+
defer span.End()
370386
var options []TableMonitor
371387
for name := range o.dbModel.Types() {
372388
options = append(options, TableMonitor{Table: name})
373389
}
374-
return o.Monitor(options...)
390+
return o.Monitor(ctx, options...)
375391
}
376392

377393
// MonitorCancel will request cancel a previously issued monitor request
378394
// RFC 7047 : monitor_cancel
379-
func (o *ovsdbClient) MonitorCancel(id string) error {
395+
func (o *ovsdbClient) MonitorCancel(ctx context.Context, id string) error {
396+
_, span := tracer.Start(ctx, "monitor_cancel")
397+
defer span.End()
380398
var reply ovsdb.OperationResult
381399
args := ovsdb.NewMonitorCancelArgs(id)
382400
o.rpcMutex.Lock()
@@ -428,12 +446,16 @@ func (o *ovsdbClient) NewTableMonitor(m model.Model, fields ...interface{}) Tabl
428446
// and populate the cache with them. Subsequent updates will be processed
429447
// by the Update Notifications
430448
// RFC 7047 : monitor
431-
func (o *ovsdbClient) Monitor(options ...TableMonitor) (string, error) {
449+
func (o *ovsdbClient) Monitor(ctx context.Context, options ...TableMonitor) (string, error) {
450+
ctx, span := tracer.Start(ctx, "monitor")
451+
defer span.End()
432452
id := uuid.NewString()
433-
return id, o.monitor(id, false, options...)
453+
return id, o.monitor(ctx, id, false, options...)
434454
}
435455

436-
func (o *ovsdbClient) monitor(id string, reconnect bool, options ...TableMonitor) error {
456+
func (o *ovsdbClient) monitor(ctx context.Context, id string, reconnect bool, options ...TableMonitor) error {
457+
ctx, span := tracer.Start(ctx, "monitor.internal", trace.WithAttributes(attribute.String("id", id)))
458+
defer span.End()
437459
if len(options) == 0 {
438460
return fmt.Errorf("no monitor options provided")
439461
}
@@ -475,12 +497,14 @@ func (o *ovsdbClient) monitor(id string, reconnect bool, options ...TableMonitor
475497
defer o.monitorsMutex.Unlock()
476498
o.monitors[id] = options
477499
}
478-
o.cache.Populate(reply)
500+
o.cache.Populate(ctx, reply)
479501
return nil
480502
}
481503

482504
// Echo tests the liveness of the OVSDB connetion
483-
func (o *ovsdbClient) Echo() error {
505+
func (o *ovsdbClient) Echo(ctx context.Context) error {
506+
_, span := tracer.Start(ctx, "echo")
507+
defer span.End()
484508
args := ovsdb.NewEchoArgs()
485509
var reply []interface{}
486510
o.rpcMutex.RLock()

0 commit comments

Comments
 (0)