1
1
package spine
2
2
3
3
import (
4
+ "crypto/sha256"
5
+ "encoding/hex"
6
+ "encoding/json"
4
7
"errors"
5
8
"fmt"
6
9
"reflect"
10
+ "sort"
7
11
"sync"
8
12
"time"
9
13
@@ -31,7 +35,10 @@ type FeatureLocal struct {
31
35
bindings []* model.FeatureAddressType // bindings to remote features
32
36
subscriptions []* model.FeatureAddressType // subscriptions to remote features
33
37
34
- mux sync.Mutex
38
+ readMsgCache map [model.MsgCounterType ]string // cache for unanswered read messages, so we can filter duplicates and not send them
39
+
40
+ mux sync.Mutex
41
+ muxReadCache sync.RWMutex
35
42
}
36
43
37
44
func NewFeatureLocal (id uint , entity api.EntityLocalInterface , ftype model.FeatureTypeType , role model.RoleType ) * FeatureLocal {
@@ -46,6 +53,7 @@ func NewFeatureLocal(id uint, entity api.EntityLocalInterface, ftype model.Featu
46
53
writeApprovalReceived : make (map [model.MsgCounterType ]int ),
47
54
pendingWriteApprovals : make (map [model.MsgCounterType ]* time.Timer ),
48
55
writeTimeout : defaultMaxResponseDelay ,
56
+ readMsgCache : make (map [model.MsgCounterType ]string ),
49
57
}
50
58
51
59
for _ , fd := range CreateFunctionData [api.FunctionDataCmdInterface ](ftype ) {
@@ -58,6 +66,68 @@ func NewFeatureLocal(id uint, entity api.EntityLocalInterface, ftype model.Featu
58
66
59
67
var _ api.FeatureLocalInterface = (* FeatureLocal )(nil )
60
68
69
+ /* Read Msg Cache */
70
+
71
+ func (r * FeatureLocal ) hashForMessage (destinationAddress * model.FeatureAddressType , cmd model.CmdType ) string {
72
+ cmdString , err := json .Marshal (cmd )
73
+ if err != nil {
74
+ return ""
75
+ }
76
+
77
+ sig := fmt .Sprintf ("%s-%s" , destinationAddress .String (), cmdString )
78
+ shaBytes := sha256 .Sum256 ([]byte (sig ))
79
+ return hex .EncodeToString (shaBytes [:])
80
+ }
81
+
82
+ func (r * FeatureLocal ) msgCounterForHashFromCache (hash string ) * model.MsgCounterType {
83
+ r .muxReadCache .RLock ()
84
+ defer r .muxReadCache .RUnlock ()
85
+
86
+ for msgCounter , h := range r .readMsgCache {
87
+ if h == hash {
88
+ return & msgCounter
89
+ }
90
+ }
91
+
92
+ return nil
93
+ }
94
+
95
+ func (r * FeatureLocal ) hasMsgCounterInCache (msgCounter model.MsgCounterType ) bool {
96
+ r .muxReadCache .RLock ()
97
+ defer r .muxReadCache .RUnlock ()
98
+
99
+ _ , ok := r .readMsgCache [msgCounter ]
100
+
101
+ return ok
102
+ }
103
+
104
+ func (r * FeatureLocal ) addMsgCounterHashToCache (msgCounter model.MsgCounterType , hash string ) {
105
+ r .muxReadCache .Lock ()
106
+ defer r .muxReadCache .Unlock ()
107
+
108
+ // cleanup cache, keep only the last 20 messages
109
+ if len (r .readMsgCache ) > 20 {
110
+ keys := make ([]uint64 , 0 , len (r .readMsgCache ))
111
+ for k := range r .readMsgCache {
112
+ keys = append (keys , uint64 (k ))
113
+ }
114
+ sort .Slice (keys , func (i , j int ) bool { return keys [i ] < keys [j ] })
115
+
116
+ // oldest key is the one with the lowest msgCounterValue
117
+ oldestKey := keys [0 ]
118
+ delete (r .readMsgCache , model .MsgCounterType (oldestKey ))
119
+ }
120
+
121
+ r .readMsgCache [msgCounter ] = hash
122
+ }
123
+
124
+ func (r * FeatureLocal ) removeMsgCounterFromCache (msgCounter model.MsgCounterType ) {
125
+ r .muxReadCache .Lock ()
126
+ defer r .muxReadCache .Unlock ()
127
+
128
+ delete (r .readMsgCache , msgCounter )
129
+ }
130
+
61
131
/* FeatureLocalInterface */
62
132
63
133
func (r * FeatureLocal ) Device () api.DeviceLocalInterface {
@@ -336,8 +406,20 @@ func (r *FeatureLocal) RequestRemoteDataBySenderAddress(
336
406
deviceSki string ,
337
407
destinationAddress * model.FeatureAddressType ,
338
408
maxDelay time.Duration ) (* model.MsgCounterType , * model.ErrorType ) {
409
+ // check if there is an unanswered read message for this destination and cmd and return that msgCounter
410
+ hash := r .hashForMessage (destinationAddress , cmd )
411
+ if len (hash ) > 0 {
412
+ if msgCounterCache := r .msgCounterForHashFromCache (hash ); msgCounterCache != nil {
413
+ return msgCounterCache , nil
414
+ }
415
+ }
416
+
339
417
msgCounter , err := sender .Request (model .CmdClassifierTypeRead , r .Address (), destinationAddress , false , []model.CmdType {cmd })
340
418
if err == nil {
419
+ if len (hash ) > 0 {
420
+ r .addMsgCounterHashToCache (* msgCounter , hash )
421
+ }
422
+
341
423
return msgCounter , nil
342
424
}
343
425
@@ -513,6 +595,12 @@ func (r *FeatureLocal) HandleMessage(message *api.Message) *model.ErrorType {
513
595
return model .NewErrorType (model .ErrorNumberTypeCommandNotSupported , "No function found for cmd data" )
514
596
}
515
597
598
+ if message .RequestHeader != nil &&
599
+ message .RequestHeader .MsgCounterReference != nil &&
600
+ r .hasMsgCounterInCache (* message .RequestHeader .MsgCounterReference ) {
601
+ r .removeMsgCounterFromCache (* message .RequestHeader .MsgCounterReference )
602
+ }
603
+
516
604
switch message .CmdClassifier {
517
605
case model .CmdClassifierTypeResult :
518
606
if err := r .processResult (message ); err != nil {
0 commit comments