Skip to content

Commit d57f25b

Browse files
committed
Add support of heartbeat interval in "on change" subscription, so that
full sync would happen periodically when hb is triggered.
1 parent cbb7631 commit d57f25b

File tree

1 file changed

+41
-6
lines changed

1 file changed

+41
-6
lines changed

sonic_data_client/db_client.go

+41-6
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ var Target2RedisDb = make(map[string]map[string]*redis.Client)
7373
// Any non-zero value that less than this threshold is considered invalid argument.
7474
var MinSampleInterval = time.Second
7575

76+
// MinHeartbeatInterval is the lowest HB interval for streaming subscriptions.
77+
// This is reserved value, which should be adjusted per BGPL benchmark result.
78+
var MinHeartbeatInterval = 3 * time.Minute
79+
7680
// IntervalTicker is a factory method to implement interval ticking.
7781
// Exposed for UT purposes.
7882
var IntervalTicker = func(interval time.Duration) <-chan time.Time {
@@ -212,7 +216,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
212216
for gnmiPath := range c.pathG2S {
213217
c.w.Add(1)
214218
c.synced.Add(1)
215-
go streamOnChangeSubscription(c, gnmiPath)
219+
go streamOnChangeSubscription(c, gnmiPath, nil)
216220
}
217221
} else {
218222
log.V(2).Infof("Stream subscription request received, mode: %v, subscription count: %v",
@@ -230,7 +234,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
230234
} else if subMode == gnmipb.SubscriptionMode_ON_CHANGE {
231235
c.w.Add(1)
232236
c.synced.Add(1)
233-
go streamOnChangeSubscription(c, sub.GetPath())
237+
go streamOnChangeSubscription(c, nil, sub)
234238
} else {
235239
enqueueFatalMsg(c, fmt.Sprintf("unsupported subscription mode, %v", subMode))
236240
return
@@ -255,19 +259,36 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
255259
}
256260

257261
// streamOnChangeSubscription implements Subscription "ON_CHANGE STREAM" mode
258-
func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path) {
262+
func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path, sub *gnmipb.Subscription) {
263+
if gnmiPath == nil {
264+
gnmiPath = sub.GetPath()
265+
}
266+
267+
// if heartbeatInterval is not assigned, use 0 to ignore periodical full sync
268+
var heartbeatInterval time.Duration = 0
269+
if sub != nil {
270+
var err error
271+
heartbeatInterval, err = validateHeartbeatInterval(sub)
272+
if err != nil {
273+
enqueueFatalMsg(c, err.Error())
274+
c.synced.Done()
275+
c.w.Done()
276+
return
277+
}
278+
}
279+
259280
tblPaths := c.pathG2S[gnmiPath]
260281
log.V(2).Infof("streamOnChangeSubscription gnmiPath: %v", gnmiPath)
261282

262283
if tblPaths[0].field != "" {
263284
if len(tblPaths) > 1 {
264-
go dbFieldMultiSubscribe(c, gnmiPath, true, time.Millisecond*200, false)
285+
go dbFieldMultiSubscribe(c, gnmiPath, true, heartbeatInterval, false)
265286
} else {
266-
go dbFieldSubscribe(c, gnmiPath, true, time.Millisecond*200)
287+
go dbFieldSubscribe(c, gnmiPath, true, heartbeatInterval)
267288
}
268289
} else {
269290
// sample interval and update only parameters are not applicable
270-
go dbTableKeySubscribe(c, gnmiPath, 0, true)
291+
go dbTableKeySubscribe(c, gnmiPath, heartbeatInterval, true)
271292
}
272293
}
273294

@@ -1340,3 +1361,17 @@ func validateSampleInterval(sub *gnmipb.Subscription) (time.Duration, error) {
13401361
return requestedInterval, nil
13411362
}
13421363
}
1364+
1365+
// validateHeartbeatInterval validates the heartbeat interval of the given subscription.
1366+
func validateHeartbeatInterval(sub *gnmipb.Subscription) (time.Duration, error) {
1367+
requestedInterval := time.Duration(sub.GetHeartbeatInterval())
1368+
if requestedInterval == 0 {
1369+
// If the heartbeat_interval is set to 0, the target MUST create the subscription
1370+
// and send the data with the MinHeartbeatInterval
1371+
return MinHeartbeatInterval, nil
1372+
} else if requestedInterval < MinHeartbeatInterval {
1373+
return 0, fmt.Errorf("invalid heartbeat interval: %v. It cannot be less than %v", requestedInterval, MinHeartbeatInterval)
1374+
} else {
1375+
return requestedInterval, nil
1376+
}
1377+
}

0 commit comments

Comments
 (0)