@@ -38,6 +38,9 @@ std::shared_ptr<swss::RedisClient> g_redisClient;
38
38
std::shared_ptr<swss::ProducerTable> getResponse;
39
39
std::shared_ptr<swss::NotificationProducer> notifications;
40
40
41
+ std::shared_ptr<std::thread> g_processFlexCounterEventThread;
42
+ volatile bool g_processFlexCounterEventThreadRun = true ;
43
+
41
44
/*
42
45
* TODO: Those are hard coded values for mlnx integration for v1.0.1 they need
43
46
* to be updated.
@@ -2954,25 +2957,98 @@ void processFlexCounterGroupEvent(
2954
2957
}
2955
2958
}
2956
2959
2960
+ std::queue<swss::KeyOpFieldsValuesTuple> g_flexCounterEventQueue;
2961
+
2962
+ bool tryPopFlexCounterEvent (
2963
+ _Out_ swss::KeyOpFieldsValuesTuple& kco)
2964
+ {
2965
+ SWSS_LOG_ENTER ();
2966
+
2967
+ std::lock_guard<std::mutex> lock (g_mutex);
2968
+
2969
+ if (g_flexCounterEventQueue.empty ())
2970
+ {
2971
+ return false ;
2972
+ }
2973
+
2974
+ kco = g_flexCounterEventQueue.front ();
2975
+
2976
+ g_flexCounterEventQueue.pop ();
2977
+
2978
+ return true ;
2979
+ }
2980
+
2981
+ void pushFlexCounterEvent (
2982
+ _In_ const swss::KeyOpFieldsValuesTuple& kco)
2983
+ {
2984
+ SWSS_LOG_ENTER ();
2985
+
2986
+ std::lock_guard<std::mutex> lock (g_mutex);
2987
+
2988
+ g_flexCounterEventQueue.push (kco);
2989
+ }
2990
+
2991
+ bool processFlexCounterEvent (
2992
+ _In_ const swss::KeyOpFieldsValuesTuple kco);
2993
+
2994
+ void processFlexCounterEventThread ()
2995
+ {
2996
+ SWSS_LOG_ENTER ();
2997
+
2998
+ while (g_processFlexCounterEventThreadRun)
2999
+ {
3000
+ swss::KeyOpFieldsValuesTuple kco;
3001
+
3002
+ if (tryPopFlexCounterEvent (kco))
3003
+ {
3004
+ if (!processFlexCounterEvent (kco))
3005
+ {
3006
+ // event was not successfully processed, put it again to the queue
3007
+
3008
+ pushFlexCounterEvent (kco);
3009
+ }
3010
+ }
3011
+
3012
+ sleep (1 );
3013
+ }
3014
+ }
3015
+
2957
3016
void processFlexCounterEvent (
2958
3017
_In_ swss::ConsumerTable &consumer)
2959
3018
{
2960
3019
SWSS_LOG_ENTER ();
2961
3020
2962
3021
swss::KeyOpFieldsValuesTuple kco;
3022
+
2963
3023
{
2964
3024
std::lock_guard<std::mutex> lock (g_mutex);
2965
3025
consumer.pop (kco);
2966
3026
}
2967
3027
3028
+ // because flex counter event can arrive independently (on RIF interface)
3029
+ // it may happen that it will be picked up from the select api before
3030
+ // actual interface will be created, and subscription for counters will
3031
+ // fail, so let's process each request in the thread and use queue for
3032
+ // arriving events, and failed events will be put back to the queue until
3033
+ // they will be processed
3034
+
3035
+ pushFlexCounterEvent (kco);
3036
+ }
3037
+
3038
+ bool processFlexCounterEvent (
3039
+ _In_ const swss::KeyOpFieldsValuesTuple kco)
3040
+ {
3041
+ SWSS_LOG_ENTER ();
3042
+
2968
3043
const auto &key = kfvKey (kco);
2969
- std::string &op = kfvOp (kco);
3044
+ const std::string &op = kfvOp (kco);
2970
3045
2971
3046
std::size_t delimiter = key.find_first_of (" :" );
2972
3047
if (delimiter == std::string::npos)
2973
3048
{
2974
3049
SWSS_LOG_ERROR (" Failed to parse the key %s" , key.c_str ());
2975
- return ;
3050
+
3051
+ return true ; // if key is invalid there is no need to process this event again
2976
3052
}
2977
3053
2978
3054
const auto groupName = key.substr (0 , delimiter);
@@ -2987,7 +3063,7 @@ void processFlexCounterEvent(
2987
3063
SWSS_LOG_WARN (" port VID %s, was not found (probably port was removed/splitted) and will remove from counters now" ,
2988
3064
sai_serialize_object_id (vid).c_str ());
2989
3065
2990
- op = DEL_COMMAND ;
3066
+ return false ;
2991
3067
}
2992
3068
2993
3069
sai_object_type_t objectType = redis_sai_object_type_query (vid); // VID and RID will have the same object type
@@ -3132,6 +3208,8 @@ void processFlexCounterEvent(
3132
3208
3133
3209
FlexCounter::setBufferPoolCounterList (vid, rid, groupName, bufferPoolCounterIds, statsMode);
3134
3210
}
3211
+
3212
+ return true ;
3135
3213
}
3136
3214
3137
3215
void printUsage ()
@@ -3846,6 +3924,11 @@ int syncd_main(int argc, char **argv)
3846
3924
3847
3925
twd.setCallback (timerWatchdogCallback);
3848
3926
3927
+ g_processFlexCounterEventThreadRun = true ;
3928
+
3929
+ g_processFlexCounterEventThread = std::make_shared<std::thread>(processFlexCounterEventThread);
3930
+
3931
+
3849
3932
while (runMainLoop)
3850
3933
{
3851
3934
try
@@ -4029,6 +4112,10 @@ int syncd_main(int argc, char **argv)
4029
4112
4030
4113
#endif
4031
4114
4115
+ g_processFlexCounterEventThreadRun = false ;
4116
+
4117
+ g_processFlexCounterEventThread->join ();
4118
+
4032
4119
FlexCounter::removeAllCounters ();
4033
4120
4034
4121
{
0 commit comments