Skip to content

Commit a12fcd8

Browse files
Merge pull request sonic-net#13 from renukamanavalan/statistics
Telemetry support for streaming events
2 parents 28b2f27 + 8d9e564 commit a12fcd8

File tree

9 files changed

+644
-27
lines changed

9 files changed

+644
-27
lines changed

azure-pipelines.yml

+39-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ resources:
1919
type: github
2020
name: sonic-net/sonic-mgmt-common
2121
endpoint: sonic-net
22+
- repository: sonic-swss-common
23+
type: github
24+
name: sonic-net/sonic-swss-common
25+
endpoint: sonic-net
2226

2327
stages:
2428
- stage: Build
@@ -47,7 +51,12 @@ stages:
4751
- checkout: sonic-mgmt-common
4852
clean: true
4953
submodules: recursive
50-
displayName: 'Checkout code'
54+
displayName: 'Checkout sonic-mgmt-common'
55+
56+
- checkout: sonic-swss-common
57+
clean: true
58+
submodules: recursive
59+
displayName: 'Checkout sonic-swss-common'
5160

5261
- task: DownloadPipelineArtifact@2
5362
inputs:
@@ -81,6 +90,16 @@ stages:
8190
sudo dpkg -i ../target/debs/buster/libyang*1.0.73*.deb
8291
displayName: "Install dependency"
8392
93+
- script: |
94+
# LIBSWSSCOMMON
95+
sudo apt-get -y purge libhiredis-dev libnl-3-dev libnl-route-3-dev
96+
sudo dpkg -i ../target/debs/buster/libnl-3-200_*.deb
97+
sudo dpkg -i ../target/debs/buster/libnl-genl-3-200_*.deb
98+
sudo dpkg -i ../target/debs/buster/libnl-route-3-200_*.deb
99+
sudo dpkg -i ../target/debs/buster/libnl-nf-3-200_*.deb
100+
sudo dpkg -i ../target/debs/buster/libhiredis0.14_*.deb
101+
displayName: "Install libswsscommon dependencies"
102+
84103
- script: |
85104
set -ex
86105
# Install .NET CORE
@@ -90,6 +109,25 @@ stages:
90109
sudo apt-get install -y dotnet-sdk-5.0
91110
displayName: "Install .NET CORE"
92111
112+
- task: DownloadPipelineArtifact@2
113+
inputs:
114+
source: specific
115+
project: build
116+
pipeline: Azure.sonic-swss-common
117+
artifact: sonic-swss-common
118+
runVersion: 'latestFromBranch'
119+
runBranch: 'refs/heads/master'
120+
displayName: "Download sonic-swss-common"
121+
122+
- script: |
123+
set -ex
124+
# LIBSWSSCOMMON
125+
sudo dpkg -i libswsscommon_1.0.0_amd64.deb
126+
sudo dpkg -i libswsscommon-dev_1.0.0_amd64.deb
127+
sudo dpkg -i python3-swsscommon_1.0.0_amd64.deb
128+
workingDirectory: $(Pipeline.Workspace)/
129+
displayName: 'Install libswsscommon package'
130+
93131
- script: |
94132
set -ex
95133
ls -l

gnmi_server/client_subscribe.go

+24-3
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,28 @@ type Client struct {
3030
// Wait for all sub go routine to finish
3131
w sync.WaitGroup
3232
fatal bool
33+
logLevel int
3334
}
3435

36+
// Syslog level for error
37+
const logLevelError int = 3
38+
const logLevelDebug int = 7
39+
const logLevelMax int = logLevelDebug
40+
3541
// NewClient returns a new initialized client.
3642
func NewClient(addr net.Addr) *Client {
3743
pq := queue.NewPriorityQueue(1, false)
3844
return &Client{
3945
addr: addr,
4046
q: pq,
47+
logLevel: logLevelError,
4148
}
4249
}
4350

51+
func (c *Client) setLogLevel(lvl int) {
52+
c.logLevel = lvl
53+
}
54+
4455
// String returns the target the client is querying.
4556
func (c *Client) String() string {
4657
return c.addr.String()
@@ -121,8 +132,12 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) {
121132
}
122133
var dc sdc.Client
123134

135+
mode := c.subscribe.GetMode()
136+
124137
if target == "OTHERS" {
125138
dc, err = sdc.NewNonDbClient(paths, prefix)
139+
} else if ((target == "EVENTS") && (mode == gnmipb.SubscriptionList_STREAM)) {
140+
dc, err = sdc.NewEventClient(paths, prefix, c.logLevel)
126141
} else if _, ok, _, _ := sdc.IsTargetDb(target); ok {
127142
dc, err = sdc.NewDbClient(paths, prefix)
128143
} else {
@@ -134,7 +149,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) {
134149
return grpc.Errorf(codes.NotFound, "%v", err)
135150
}
136151

137-
switch mode := c.subscribe.GetMode(); mode {
152+
switch mode {
138153
case gnmipb.SubscriptionList_STREAM:
139154
c.stop = make(chan struct{}, 1)
140155
c.w.Add(1)
@@ -155,7 +170,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) {
155170

156171
log.V(1).Infof("Client %s running", c)
157172
go c.recv(stream)
158-
err = c.send(stream)
173+
err = c.send(stream, dc)
159174
c.Close()
160175
// Wait until all child go routines exited
161176
c.w.Wait()
@@ -226,8 +241,9 @@ func (c *Client) recv(stream gnmipb.GNMI_SubscribeServer) {
226241
}
227242

228243
// send runs until process Queue returns an error.
229-
func (c *Client) send(stream gnmipb.GNMI_SubscribeServer) error {
244+
func (c *Client) send(stream gnmipb.GNMI_SubscribeServer, dc sdc.Client) error {
230245
for {
246+
var val *sdc.Value
231247
items, err := c.q.Get(1)
232248

233249
if items == nil {
@@ -241,12 +257,14 @@ func (c *Client) send(stream gnmipb.GNMI_SubscribeServer) error {
241257
}
242258

243259
var resp *gnmipb.SubscribeResponse
260+
244261
switch v := items[0].(type) {
245262
case sdc.Value:
246263
if resp, err = sdc.ValToResp(v); err != nil {
247264
c.errors++
248265
return err
249266
}
267+
val = &v;
250268
default:
251269
log.V(1).Infof("Unknown data type %v for %s in queue", items[0], c)
252270
c.errors++
@@ -257,8 +275,11 @@ func (c *Client) send(stream gnmipb.GNMI_SubscribeServer) error {
257275
if err != nil {
258276
log.V(1).Infof("Client %s sending error:%v", c, err)
259277
c.errors++
278+
dc.FailedSend()
260279
return err
261280
}
281+
282+
dc.SentOne(val)
262283
log.V(5).Infof("Client %s done sending, msg count %d, msg %v", c, c.sendMsg, resp)
263284
}
264285
}

gnmi_server/server.go

+3
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type Config struct {
4747
// Port for the Server to listen on. If 0 or unset the Server will pick a port
4848
// for this Server.
4949
Port int64
50+
LogLevel int
5051
UserAuth AuthTypes
5152
}
5253

@@ -234,6 +235,8 @@ func (s *Server) Subscribe(stream gnmipb.GNMI_SubscribeServer) error {
234235

235236
c := NewClient(pr.Addr)
236237

238+
c.setLogLevel(s.config.LogLevel)
239+
237240
s.cMu.Lock()
238241
if oc, ok := s.clients[c.String()]; ok {
239242
log.V(2).Infof("Delete duplicate client %s", oc)

gnmi_server/server_test.go

+129
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"flag"
99
"fmt"
1010
"strings"
11+
"unsafe"
1112

1213
testcert "github.com/sonic-net/sonic-gnmi/testdata/tls"
1314
"github.com/go-redis/redis"
@@ -516,6 +517,20 @@ func createQueryOrFail(t *testing.T, subListMode pb.SubscriptionList_Mode, targe
516517
return *q
517518
}
518519

520+
// create query for subscribing to events.
521+
func createEventsQuery(t *testing.T, paths ...string) client.Query {
522+
return createQueryOrFail(t,
523+
pb.SubscriptionList_STREAM,
524+
"EVENTS",
525+
[]subscriptionQuery{
526+
{
527+
Query: paths,
528+
SubMode: pb.SubscriptionMode_ON_CHANGE,
529+
},
530+
},
531+
false)
532+
}
533+
519534
// createCountersDbQueryOnChangeMode creates a query with ON_CHANGE mode.
520535
func createCountersDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query {
521536
return createQueryOrFail(t,
@@ -2587,6 +2602,120 @@ func TestAuthCapabilities(t *testing.T) {
25872602

25882603
}
25892604

2605+
func TestClient(t *testing.T) {
2606+
events := [] sdc.Evt_rcvd {
2607+
{ "test0", 7, 777 },
2608+
{ "test1", 6, 677 },
2609+
{ "test2", 5, 577 },
2610+
{ "test3", 4, 477 },
2611+
}
2612+
2613+
HEARTBEAT_SET := 5
2614+
heartbeat := 0
2615+
event_index := 0
2616+
rcv_timeout := sdc.SUBSCRIBER_TIMEOUT
2617+
deinit_done := false
2618+
2619+
mock1 := gomonkey.ApplyFunc(sdc.C_init_subs, func() unsafe.Pointer {
2620+
return nil
2621+
})
2622+
defer mock1.Reset()
2623+
2624+
mock2 := gomonkey.ApplyFunc(sdc.C_recv_evt, func(h unsafe.Pointer) (int, sdc.Evt_rcvd) {
2625+
rc := (int)(0)
2626+
var evt sdc.Evt_rcvd
2627+
2628+
if event_index < len(events) {
2629+
evt = events[event_index]
2630+
event_index++
2631+
} else {
2632+
time.Sleep(time.Millisecond * time.Duration(rcv_timeout))
2633+
rc = -1
2634+
}
2635+
return rc, evt
2636+
})
2637+
defer mock2.Reset()
2638+
2639+
mock3 := gomonkey.ApplyFunc(sdc.Set_heartbeat, func(val int) {
2640+
heartbeat = val
2641+
})
2642+
2643+
defer mock3.Reset()
2644+
2645+
mock4 := gomonkey.ApplyFunc(sdc.C_deinit_subs, func(h unsafe.Pointer) {
2646+
deinit_done = true
2647+
})
2648+
2649+
defer mock4.Reset()
2650+
2651+
s := createServer(t, 8081)
2652+
go runServer(t, s)
2653+
2654+
qstr := fmt.Sprintf("all[heartbeat=%d]", HEARTBEAT_SET)
2655+
q := createEventsQuery(t, qstr)
2656+
// q := createEventsQuery(t, "all")
2657+
q.Addrs = []string{"127.0.0.1:8081"}
2658+
2659+
tests := []struct {
2660+
desc string
2661+
pub_data []string
2662+
wantErr bool
2663+
wantNoti []client.Notification
2664+
pause int
2665+
poll int
2666+
} {
2667+
{
2668+
desc: "base client create",
2669+
poll: 3,
2670+
},
2671+
}
2672+
2673+
sdc.C_init_subs()
2674+
2675+
for _, tt := range tests {
2676+
heartbeat = 0
2677+
deinit_done = false
2678+
t.Run(tt.desc, func(t *testing.T) {
2679+
c := client.New()
2680+
defer c.Close()
2681+
2682+
var gotNoti []string
2683+
q.NotificationHandler = func(n client.Notification) error {
2684+
if nn, ok := n.(client.Update); ok {
2685+
nn.TS = time.Unix(0, 200)
2686+
str := fmt.Sprintf("%v", nn.Val)
2687+
gotNoti = append(gotNoti, str)
2688+
}
2689+
return nil
2690+
}
2691+
2692+
go func() {
2693+
c.Subscribe(context.Background(), q)
2694+
}()
2695+
2696+
// wait for half second for subscribeRequest to sync
2697+
time.Sleep(time.Millisecond * 2000)
2698+
2699+
if len(events) != len(gotNoti) {
2700+
t.Errorf("noti[%d] != events[%d]", len(gotNoti), len(events))
2701+
}
2702+
2703+
if (heartbeat != HEARTBEAT_SET) {
2704+
t.Errorf("Heartbeat is not set %d != expected:%d", heartbeat, HEARTBEAT_SET)
2705+
}
2706+
fmt.Printf("DONE: events:%d gotNoti=%d\n", len(events), len(gotNoti))
2707+
})
2708+
time.Sleep(time.Millisecond * 1000)
2709+
2710+
if (deinit_done == false) {
2711+
t.Errorf("Events client deinit *NOT* called.")
2712+
}
2713+
// t.Log("END of a TEST")
2714+
}
2715+
2716+
s.s.Stop()
2717+
}
2718+
25902719
func init() {
25912720
// Enable logs at UT setup
25922721
flag.Lookup("v").Value.Set("10")

sonic_data_client/db_client.go

+13-2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ type Client interface {
5151

5252
// Close provides implemenation for explicit cleanup of Client
5353
Close() error
54+
55+
// callbacks on send failed
56+
FailedSend()
57+
58+
// callback on sent
59+
SentOne(*Value)
5460
}
5561

5662
type Stream interface {
@@ -271,7 +277,6 @@ func (c *DbClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.W
271277
SyncResponse: false,
272278
Val: val,
273279
}
274-
275280
c.q.Put(Value{spbv})
276281
log.V(6).Infof("Added spbv #%v", spbv)
277282
}
@@ -488,7 +493,7 @@ func populateDbtablePath(prefix, path *gnmipb.Path, pathG2S *map[*gnmipb.Path][]
488493
}
489494

490495
if targetDbName == "COUNTERS_DB" {
491-
err := initCountersPortNameMap()
496+
err := initCountersPortNameMap()
492497
if err != nil {
493498
return err
494499
}
@@ -1230,6 +1235,12 @@ func (c *DbClient) Capabilities() []gnmipb.ModelData {
12301235
return nil
12311236
}
12321237

1238+
func (c *DbClient) SentOne(val *Value) {
1239+
}
1240+
1241+
func (c *DbClient) FailedSend() {
1242+
}
1243+
12331244
// validateSampleInterval validates the sampling interval of the given subscription.
12341245
func validateSampleInterval(sub *gnmipb.Subscription) (time.Duration, error) {
12351246
requestedInterval := time.Duration(sub.GetSampleInterval())

0 commit comments

Comments
 (0)