@@ -1445,6 +1445,7 @@ class ExtConfigDBConnector(ConfigDBConnector):
1445
1445
def __init__ (self , ns_attrs = None ):
1446
1446
super (ExtConfigDBConnector , self ).__init__ ()
1447
1447
self .nosort_attrs = ns_attrs if ns_attrs is not None else {}
1448
+ self .__listen_thread_running = False
1448
1449
def raw_to_typed (self , raw_data , table = '' ):
1449
1450
if len (raw_data ) == 0 :
1450
1451
raw_data = None
@@ -1469,12 +1470,28 @@ def sub_msg_handler(self, msg_item):
1469
1470
except Exception as e :
1470
1471
syslog .syslog (syslog .LOG_ERR , '[bgp cfgd] Failed handling config DB update with exception:' + str (e ))
1471
1472
logging .exception (e )
1473
+
1474
+ def listen_thread (self , timeout ):
1475
+ self .__listen_thread_running = True
1476
+ sub_key_space = "__keyspace@{}__:*" .format (self .get_dbid (self .db_name ))
1477
+ self .pubsub .psubscribe (sub_key_space )
1478
+ while self .__listen_thread_running :
1479
+ msg = self .pubsub .get_message (timeout , True )
1480
+ if msg :
1481
+ self .sub_msg_handler (msg )
1482
+
1483
+ self .pubsub .punsubscribe (sub_key_space )
1484
+
1472
1485
def listen (self ):
1473
1486
"""Start listen Redis keyspace events and will trigger corresponding handlers when content of a table changes.
1474
1487
"""
1475
1488
self .pubsub = self .get_redis_client (self .db_name ).pubsub ()
1476
- self .pubsub .psubscribe (** {"__keyspace@{}__:*" .format (self .get_dbid (self .db_name )): self .sub_msg_handler })
1477
- self .sub_thread = self .pubsub .run_in_thread (sleep_time = 0.01 )
1489
+ self .sub_thread = threading .Thread (target = self .listen_thread , args = (0.01 ,))
1490
+ self .sub_thread .start ()
1491
+
1492
+ def stop_listen (self ):
1493
+ self .__listen_thread_running = False
1494
+
1478
1495
@staticmethod
1479
1496
def get_table_key (table , key ):
1480
1497
return table + '&&' + key
@@ -3774,7 +3791,7 @@ def start(self):
3774
3791
self .subscribe_all ()
3775
3792
self .config_db .listen ()
3776
3793
def stop (self ):
3777
- self .config_db .sub_thread . stop ()
3794
+ self .config_db .stop_listen ()
3778
3795
if self .config_db .sub_thread .is_alive ():
3779
3796
self .config_db .sub_thread .join ()
3780
3797
0 commit comments