6
6
#include < memory>
7
7
#include < condition_variable>
8
8
9
+ #include " NotificationQueue.h"
10
+
9
11
void send_notification (
10
12
_In_ std::string op,
11
13
_In_ std::string data,
@@ -602,95 +604,14 @@ void processNotification(
602
604
603
605
std::condition_variable cv;
604
606
605
- class ntf_queue_t
606
- {
607
- public:
608
- ntf_queue_t () { }
609
- bool enqueue (swss::KeyOpFieldsValuesTuple msg);
610
- bool tryDequeue (swss::KeyOpFieldsValuesTuple& msg);
611
- size_t queueStats ()
612
- {
613
- SWSS_LOG_ENTER ();
614
-
615
- std::lock_guard<std::mutex> lock (queue_mutex);
616
-
617
- return ntf_queue.size ();
618
- }
619
-
620
- private:
621
- std::mutex queue_mutex;
622
- std::queue<swss::KeyOpFieldsValuesTuple> ntf_queue;
623
-
624
- /*
625
- * Value based on typical L2 deployment with 256k MAC entries and
626
- * some extra buffer for other events like port-state, q-deadlock etc
627
- */
628
- const size_t limit = 300000 ;
629
- };
630
-
631
-
632
607
/*
633
608
* Make sure that notification queue pointer is populated before we start
634
609
* thread, and before we create_switch, since at switch_create we can start
635
610
* receiving fdb_notifications which will arrive on different thread and
636
- * will call queueStats () when queue pointer could be null (this=0x0).
611
+ * will call getQueueSize () when queue pointer could be null (this=0x0).
637
612
*/
638
613
639
- static std::unique_ptr<ntf_queue_t > ntf_queue_hdlr = std::unique_ptr<ntf_queue_t >(new ntf_queue_t );
640
-
641
- bool ntf_queue_t::tryDequeue (
642
- _Out_ swss::KeyOpFieldsValuesTuple &item)
643
- {
644
- std::lock_guard<std::mutex> lock (queue_mutex);
645
-
646
- SWSS_LOG_ENTER ();
647
-
648
- if (ntf_queue.empty ())
649
- {
650
- return false ;
651
- }
652
-
653
- item = ntf_queue.front ();
654
-
655
- ntf_queue.pop ();
656
-
657
- return true ;
658
- }
659
-
660
- bool ntf_queue_t::enqueue (
661
- _In_ swss::KeyOpFieldsValuesTuple item)
662
- {
663
- SWSS_LOG_ENTER ();
664
-
665
- std::string notification = kfvKey (item);
666
-
667
- /*
668
- * If the queue exceeds the limit, then drop all further FDB events This is
669
- * a temporary solution to handle high memory usage by syncd and the
670
- * notification queue keeps growing. The permanent solution would be to
671
- * make this stateful so that only the *latest* event is published.
672
- */
673
-
674
- if (queueStats () < limit || notification != " fdb_event" )
675
- {
676
- std::lock_guard<std::mutex> lock (queue_mutex);
677
-
678
- ntf_queue.push (item);
679
- return true ;
680
- }
681
-
682
- static uint32_t log_count = 0 ;
683
-
684
- if (!(log_count % 1000 ))
685
- {
686
- SWSS_LOG_NOTICE (" Too many messages in queue (%zu), dropped %u FDB events!" ,
687
- queueStats (), (log_count+1 ));
688
- }
689
-
690
- ++log_count;
691
-
692
- return false ;
693
- }
614
+ static auto g_notificationQueue = std::make_shared<NotificationQueue>();
694
615
695
616
void enqueue_notification (
696
617
_In_ std::string op,
@@ -703,7 +624,7 @@ void enqueue_notification(
703
624
704
625
swss::KeyOpFieldsValuesTuple item (op, data, entry);
705
626
706
- if (ntf_queue_hdlr ->enqueue (item))
627
+ if (g_notificationQueue ->enqueue (item))
707
628
{
708
629
cv.notify_all ();
709
630
}
@@ -808,7 +729,7 @@ void ntf_process_function()
808
729
809
730
swss::KeyOpFieldsValuesTuple item;
810
731
811
- while (ntf_queue_hdlr ->tryDequeue (item))
732
+ while (g_notificationQueue ->tryDequeue (item))
812
733
{
813
734
processNotification (item);
814
735
}
0 commit comments