Skip to content

Commit c03d8e3

Browse files
committed
Add support of heartbeat interval in "on change" subscription, so that
full sync would happen periodically when hb is triggered.
1 parent cbb7631 commit c03d8e3

File tree

2 files changed

+81
-17
lines changed

2 files changed

+81
-17
lines changed

gnmi_server/server_test.go

+40-11
Original file line numberDiff line numberDiff line change
@@ -709,6 +709,7 @@ type subscriptionQuery struct {
709709
Query []string
710710
SubMode pb.SubscriptionMode
711711
SampleInterval uint64
712+
HeartbeatInterval uint64
712713
}
713714

714715
func pathToString(q client.Path) string {
@@ -745,6 +746,7 @@ func createQuery(subListMode pb.SubscriptionList_Mode, target string, queries []
745746
Path: pp,
746747
Mode: qq.SubMode,
747748
SampleInterval: qq.SampleInterval,
749+
HeartbeatInterval: qq.HeartbeatInterval,
748750
})
749751
}
750752

@@ -792,14 +794,15 @@ func createStateDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query
792794
}
793795

794796
// createCountersDbQueryOnChangeMode creates a query with ON_CHANGE mode.
795-
func createCountersDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query {
797+
func createCountersDbQueryOnChangeMode(t *testing.T, interval time.Duration, paths ...string) client.Query {
796798
return createQueryOrFail(t,
797799
pb.SubscriptionList_STREAM,
798800
"COUNTERS_DB",
799801
[]subscriptionQuery{
800802
{
801803
Query: paths,
802804
SubMode: pb.SubscriptionMode_ON_CHANGE,
805+
HeartbeatInterval: uint64(interval.Nanoseconds()),
803806
},
804807
},
805808
false)
@@ -1800,9 +1803,35 @@ func runTestSubscribe(t *testing.T, namespace string) {
18001803
generateIntervals bool
18011804
}
18021805
tests := []TestExec {
1806+
{
1807+
desc: "Testing invalid heartbeat interval",
1808+
q: createCountersDbQueryOnChangeMode(t, 10 * time.Second, "COUNTERS_PORT_NAME_MAP"),
1809+
wantSubErr: fmt.Errorf("rpc error: code = InvalidArgument desc = invalid heartbeat interval: 10s. It cannot be less than %v", sdc.MinHeartbeatInterval),
1810+
wantNoti: []client.Notification{},
1811+
},
1812+
{
1813+
desc: "stream query with Heartbeat interval for table key Ethernet68 with new test_field field",
1814+
q: createCountersDbQueryOnChangeMode(t, 30*time.second, "COUNTERS", "Ethernet68"),
1815+
updates: []tablePathValue{
1816+
{
1817+
dbName: "COUNTERS_DB",
1818+
tableName: "COUNTERS",
1819+
tableKey: "oid:0x1000000000039", // "Ethernet68": "oid:0x1000000000039",
1820+
delimitor: ":",
1821+
field: "test_field",
1822+
value: "test_value",
1823+
},
1824+
},
1825+
wantNoti: []client.Notification{
1826+
client.Connected{},
1827+
client.Update{Path: []string{"COUNTERS", "Ethernet68"}, TS: time.Unix(0, 200), Val: countersEthernet68Json},
1828+
client.Sync{},
1829+
client.Update{Path: []string{"COUNTERS", "Ethernet68"}, TS: time.Unix(0, 200), Val: countersEthernet68Json},
1830+
},
1831+
},
18031832
{
18041833
desc: "stream query for table COUNTERS_PORT_NAME_MAP with new test_field field",
1805-
q: createCountersDbQueryOnChangeMode(t, "COUNTERS_PORT_NAME_MAP"),
1834+
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS_PORT_NAME_MAP"),
18061835
updates: []tablePathValue{{
18071836
dbName: "COUNTERS_DB",
18081837
tableName: "COUNTERS_PORT_NAME_MAP",
@@ -1818,7 +1847,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
18181847
},
18191848
{
18201849
desc: "stream query for table key Ethernet68 with new test_field field",
1821-
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68"),
1850+
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68"),
18221851
updates: []tablePathValue{
18231852
{
18241853
dbName: "COUNTERS_DB",
@@ -1846,7 +1875,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
18461875
},
18471876
{
18481877
desc: "(use vendor alias) stream query for table key Ethernet68/1 with new test_field field",
1849-
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68/1"),
1878+
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68/1"),
18501879
updates: []tablePathValue{
18511880
{
18521881
dbName: "COUNTERS_DB",
@@ -1874,7 +1903,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
18741903
},
18751904
{
18761905
desc: "stream query for COUNTERS/Ethernet68/SAI_PORT_STAT_PFC_7_RX_PKTS with update of field value",
1877-
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
1906+
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
18781907
updates: []tablePathValue{
18791908
{
18801909
dbName: "COUNTERS_DB",
@@ -1902,7 +1931,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
19021931
},
19031932
{
19041933
desc: "(use vendor alias) stream query for COUNTERS/[Ethernet68/1]/SAI_PORT_STAT_PFC_7_RX_PKTS with update of field value",
1905-
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68/1", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
1934+
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68/1", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
19061935
updates: []tablePathValue{
19071936
{
19081937
dbName: "COUNTERS_DB",
@@ -1930,7 +1959,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
19301959
},
19311960
{
19321961
desc: "stream query for COUNTERS/Ethernet68/Pfcwd with update of field value",
1933-
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68", "Pfcwd"),
1962+
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68", "Pfcwd"),
19341963
updates: []tablePathValue{
19351964
{
19361965
dbName: "COUNTERS_DB",
@@ -1958,7 +1987,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
19581987
},
19591988
{
19601989
desc: "(use vendor alias) stream query for COUNTERS/[Ethernet68/1]/Pfcwd with update of field value",
1961-
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68/1", "Pfcwd"),
1990+
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68/1", "Pfcwd"),
19621991
updates: []tablePathValue{
19631992
{
19641993
dbName: "COUNTERS_DB",
@@ -1986,7 +2015,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
19862015
},
19872016
{
19882017
desc: "stream query for table key Ethernet* with new test_field field on Ethernet68",
1989-
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet*"),
2018+
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet*"),
19902019
updates: []tablePathValue{
19912020
{
19922021
dbName: "COUNTERS_DB",
@@ -2014,7 +2043,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
20142043
},
20152044
{
20162045
desc: "stream query for table key Ethernet*/SAI_PORT_STAT_PFC_7_RX_PKTS with field value update",
2017-
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet*", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
2046+
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet*", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
20182047
updates: []tablePathValue{
20192048
{
20202049
dbName: "COUNTERS_DB",
@@ -2034,7 +2063,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
20342063
},
20352064
{
20362065
desc: "stream query for table key Ethernet*/Pfcwd with field value update",
2037-
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet*", "Pfcwd"),
2066+
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet*", "Pfcwd"),
20382067
updates: []tablePathValue{
20392068
{
20402069
dbName: "COUNTERS_DB",

sonic_data_client/db_client.go

+41-6
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ var Target2RedisDb = make(map[string]map[string]*redis.Client)
7373
// Any non-zero value that less than this threshold is considered invalid argument.
7474
var MinSampleInterval = time.Second
7575

76+
// MinHeartbeatInterval is the lowest HB interval for streaming subscriptions.
77+
// This is reserved value, which should be adjusted per BGPL benchmark result.
78+
var MinHeartbeatInterval = 1 * time.Minute
79+
7680
// IntervalTicker is a factory method to implement interval ticking.
7781
// Exposed for UT purposes.
7882
var IntervalTicker = func(interval time.Duration) <-chan time.Time {
@@ -212,7 +216,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
212216
for gnmiPath := range c.pathG2S {
213217
c.w.Add(1)
214218
c.synced.Add(1)
215-
go streamOnChangeSubscription(c, gnmiPath)
219+
go streamOnChangeSubscription(c, gnmiPath, nil)
216220
}
217221
} else {
218222
log.V(2).Infof("Stream subscription request received, mode: %v, subscription count: %v",
@@ -230,7 +234,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
230234
} else if subMode == gnmipb.SubscriptionMode_ON_CHANGE {
231235
c.w.Add(1)
232236
c.synced.Add(1)
233-
go streamOnChangeSubscription(c, sub.GetPath())
237+
go streamOnChangeSubscription(c, nil, sub)
234238
} else {
235239
enqueueFatalMsg(c, fmt.Sprintf("unsupported subscription mode, %v", subMode))
236240
return
@@ -255,19 +259,36 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
255259
}
256260

257261
// streamOnChangeSubscription implements Subscription "ON_CHANGE STREAM" mode
258-
func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path) {
262+
func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path, sub *gnmipb.Subscription) {
263+
if gnmiPath == nil {
264+
gnmiPath = sub.GetPath()
265+
}
266+
267+
// if heartbeatInterval is not assigned, use 0 to ignore periodical full sync
268+
var heartbeatInterval time.Duration = 0
269+
if sub != nil {
270+
var err error
271+
heartbeatInterval, err = validateHeartbeatInterval(sub)
272+
if err != nil {
273+
enqueueFatalMsg(c, err.Error())
274+
c.synced.Done()
275+
c.w.Done()
276+
return
277+
}
278+
}
279+
259280
tblPaths := c.pathG2S[gnmiPath]
260281
log.V(2).Infof("streamOnChangeSubscription gnmiPath: %v", gnmiPath)
261282

262283
if tblPaths[0].field != "" {
263284
if len(tblPaths) > 1 {
264-
go dbFieldMultiSubscribe(c, gnmiPath, true, time.Millisecond*200, false)
285+
go dbFieldMultiSubscribe(c, gnmiPath, true, heartbeatInterval, false)
265286
} else {
266-
go dbFieldSubscribe(c, gnmiPath, true, time.Millisecond*200)
287+
go dbFieldSubscribe(c, gnmiPath, true, heartbeatInterval)
267288
}
268289
} else {
269290
// sample interval and update only parameters are not applicable
270-
go dbTableKeySubscribe(c, gnmiPath, 0, true)
291+
go dbTableKeySubscribe(c, gnmiPath, heartbeatInterval, true)
271292
}
272293
}
273294

@@ -1340,3 +1361,17 @@ func validateSampleInterval(sub *gnmipb.Subscription) (time.Duration, error) {
13401361
return requestedInterval, nil
13411362
}
13421363
}
1364+
1365+
// validateHeartbeatInterval validates the heartbeat interval of the given subscription.
1366+
func validateHeartbeatInterval(sub *gnmipb.Subscription) (time.Duration, error) {
1367+
requestedInterval := time.Duration(sub.GetHeartbeatInterval())
1368+
if requestedInterval == 0 {
1369+
// If the heartbeat_interval is set to 0, the target MUST create the subscription
1370+
// and send the data with the MinHeartbeatInterval
1371+
return MinHeartbeatInterval, nil
1372+
} else if requestedInterval < MinHeartbeatInterval {
1373+
return 0, fmt.Errorf("invalid heartbeat interval: %v. It cannot be less than %v", requestedInterval, MinHeartbeatInterval)
1374+
} else {
1375+
return requestedInterval, nil
1376+
}
1377+
}

0 commit comments

Comments
 (0)