@@ -634,29 +634,40 @@ def thread_coming_entry():
634
634
def test_ConfigDBInit ():
635
635
table_name_1 = 'TEST_TABLE_1'
636
636
table_name_2 = 'TEST_TABLE_2'
637
+ table_name_3 = 'TEST_TABLE_3'
637
638
test_key = 'key1'
638
639
test_data = {'field1' : 'value1' }
639
- test_data_update = {'field1' : 'value2' }
640
+
641
+ queue = multiprocessing .Queue ()
640
642
641
643
manager = multiprocessing .Manager ()
642
644
ret_data = manager .dict ()
643
645
644
- def test_handler (table , key , data , ret ):
645
- ret [table ] = {key : data }
646
+ def test_handler (table , key , data , ret , q = None ):
647
+ if data is None :
648
+ ret [table ] = {k : v for k , v in ret [table ].items () if k != key }
649
+ else :
650
+ ret [table ] = {key : data }
651
+
652
+ if q :
653
+ q .put (ret [table ])
646
654
647
- def test_init_handler (data , ret ):
655
+ def test_init_handler (data , ret , queue ):
648
656
ret .update (data )
657
+ queue .put (ret )
649
658
650
- def thread_listen (ret ):
659
+ def thread_listen (ret , queue ):
651
660
config_db = ConfigDBConnector ()
652
661
config_db .connect (wait_for_init = False )
653
662
654
663
config_db .subscribe (table_name_1 , lambda table , key , data : test_handler (table , key , data , ret ),
655
664
fire_init_data = False )
656
665
config_db .subscribe (table_name_2 , lambda table , key , data : test_handler (table , key , data , ret ),
657
666
fire_init_data = True )
667
+ config_db .subscribe (table_name_3 , lambda table , key , data : test_handler (table , key , data , ret , queue ),
668
+ fire_init_data = False )
658
669
659
- config_db .listen (init_data_handler = lambda data : test_init_handler (data , ret ))
670
+ config_db .listen (init_data_handler = lambda data : test_init_handler (data , ret , queue ))
660
671
661
672
config_db = ConfigDBConnector ()
662
673
config_db .connect (wait_for_init = False )
@@ -666,14 +677,26 @@ def thread_listen(ret):
666
677
# Init table data
667
678
config_db .set_entry (table_name_1 , test_key , test_data )
668
679
config_db .set_entry (table_name_2 , test_key , test_data )
680
+ config_db .set_entry (table_name_3 , test_key , {})
669
681
670
- thread = multiprocessing .Process (target = thread_listen , args = (ret_data ,))
682
+ thread = multiprocessing .Process (target = thread_listen , args = (ret_data , queue ))
671
683
thread .start ()
672
- time .sleep (5 )
673
- thread .terminate ()
674
684
675
- assert ret_data [table_name_1 ] == {test_key : test_data }
676
- assert ret_data [table_name_2 ] == {test_key : test_data }
685
+ init_data = queue .get (5 )
686
+
687
+ # Verify that all tables initialized correctly
688
+ assert init_data [table_name_1 ] == {test_key : test_data }
689
+ assert init_data [table_name_2 ] == {test_key : test_data }
690
+ assert init_data [table_name_3 ] == {test_key : {}}
691
+
692
+ # Remove the entry (with no attributes) from the table.
693
+ # Verify that the update is received and a callback is called
694
+ config_db .set_entry (table_name_3 , test_key , None )
695
+
696
+ table_3_data = queue .get (5 )
697
+ assert test_key not in table_3_data
698
+
699
+ thread .terminate ()
677
700
678
701
679
702
def test_DBConnectFailure ():
0 commit comments