Skip to content

Commit 2211b7e

Browse files
authored
Xcvrd should restart if any child thread crashes (sonic-net#326)
* Xcvrd should restart if any child thread crashes Signed-off-by: Mihir Patel <[email protected]> * Resolved test_SfpStateUpdateTask_task_run_stop test failure * Added comment for raise_exception Signed-off-by: Mihir Patel <[email protected]> * Added check for avoiding cmis_manager.start() if CMIS thread is supposed to be skipped. Also, moidified join function to handle accordingly. Added logs for showing names of threads spawned Signed-off-by: Mihir Patel <[email protected]> Signed-off-by: Mihir Patel <[email protected]>
1 parent 753b550 commit 2211b7e

File tree

2 files changed

+266
-88
lines changed

2 files changed

+266
-88
lines changed

sonic-xcvrd/tests/test_xcvrd.py

+140-33
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,95 @@
4040
global_media_settings = media_settings_with_comma_dict['GLOBAL_MEDIA_SETTINGS'].pop('1-32')
4141
media_settings_with_comma_dict['GLOBAL_MEDIA_SETTINGS']['1-5,6,7-20,21-32'] = global_media_settings
4242

43+
class TestXcvrdThreadException(object):
44+
45+
@patch('xcvrd.xcvrd.platform_chassis', MagicMock())
46+
def test_CmisManagerTask_task_run_with_exception(self):
47+
port_mapping = PortMapping()
48+
stop_event = threading.Event()
49+
cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
50+
cmis_manager.wait_for_port_config_done = MagicMock(side_effect = NotImplementedError)
51+
exception_received = None
52+
trace = None
53+
try:
54+
cmis_manager.start()
55+
cmis_manager.join()
56+
except Exception as e1:
57+
exception_received = e1
58+
trace = traceback.format_exc()
59+
60+
assert not cmis_manager.is_alive()
61+
assert(type(exception_received) == NotImplementedError)
62+
assert("NotImplementedError" in str(trace) and "effect" in str(trace))
63+
assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace))
64+
assert("wait_for_port_config_done" in str(trace))
65+
66+
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError))
67+
def test_DomInfoUpdateTask_task_run_with_exception(self):
68+
port_mapping = PortMapping()
69+
stop_event = threading.Event()
70+
dom_info_update = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
71+
exception_received = None
72+
trace = None
73+
try:
74+
dom_info_update.start()
75+
dom_info_update.join()
76+
except Exception as e1:
77+
exception_received = e1
78+
trace = traceback.format_exc()
79+
80+
assert not dom_info_update.is_alive()
81+
assert(type(exception_received) == NotImplementedError)
82+
assert("NotImplementedError" in str(trace) and "effect" in str(trace))
83+
assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace))
84+
assert("subscribe_port_config_change" in str(trace))
85+
86+
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError))
87+
def test_SfpStateUpdateTask_task_run_with_exception(self):
88+
port_mapping = PortMapping()
89+
retry_eeprom_set = set()
90+
stop_event = threading.Event()
91+
sfp_error_event = threading.Event()
92+
sfp_state_update = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
93+
exception_received = None
94+
trace = None
95+
try:
96+
sfp_state_update.start()
97+
sfp_state_update.join()
98+
except Exception as e1:
99+
exception_received = e1
100+
trace = traceback.format_exc()
101+
102+
assert not sfp_state_update.is_alive()
103+
assert(type(exception_received) == NotImplementedError)
104+
assert("NotImplementedError" in str(trace) and "effect" in str(trace))
105+
assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace))
106+
assert("subscribe_port_config_change" in str(trace))
107+
108+
@patch('xcvrd.xcvrd.SfpStateUpdateTask.is_alive', MagicMock(return_value = False))
109+
@patch('xcvrd.xcvrd.DomInfoUpdateTask.is_alive', MagicMock(return_value = False))
110+
@patch('xcvrd.xcvrd.CmisManagerTask.is_alive', MagicMock(return_value = False))
111+
@patch('xcvrd.xcvrd.CmisManagerTask.join', MagicMock(side_effect = NotImplementedError))
112+
@patch('xcvrd.xcvrd.CmisManagerTask.start', MagicMock())
113+
@patch('xcvrd.xcvrd.DomInfoUpdateTask.start', MagicMock())
114+
@patch('xcvrd.xcvrd.SfpStateUpdateTask.start', MagicMock())
115+
@patch('xcvrd.xcvrd.DaemonXcvrd.deinit', MagicMock())
116+
@patch('os.kill')
117+
@patch('xcvrd.xcvrd.DaemonXcvrd.init')
118+
@patch('xcvrd.xcvrd.DomInfoUpdateTask.join')
119+
@patch('xcvrd.xcvrd.SfpStateUpdateTask.join')
120+
def test_DaemonXcvrd_run_with_exception(self, mock_task_join1, mock_task_join2, mock_init, mock_os_kill):
121+
mock_init.return_value = (PortMapping(), set())
122+
xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER)
123+
xcvrd.stop_event.wait = MagicMock()
124+
xcvrd.run()
125+
126+
assert len(xcvrd.threads) == 3
127+
assert mock_init.call_count == 1
128+
assert mock_task_join1.call_count == 1
129+
assert mock_task_join2.call_count == 1
130+
assert mock_os_kill.call_count == 1
131+
43132
class TestXcvrdScript(object):
44133

45134
@patch('xcvrd.xcvrd._wrapper_get_sfp_type')
@@ -482,10 +571,10 @@ def test_DaemonXcvrd_wait_for_port_config_done(self, mock_select, mock_sub_table
482571

483572
@patch('xcvrd.xcvrd.DaemonXcvrd.init')
484573
@patch('xcvrd.xcvrd.DaemonXcvrd.deinit')
485-
@patch('xcvrd.xcvrd.DomInfoUpdateTask.task_run')
486-
@patch('xcvrd.xcvrd.SfpStateUpdateTask.task_run')
487-
@patch('xcvrd.xcvrd.DomInfoUpdateTask.task_stop')
488-
@patch('xcvrd.xcvrd.SfpStateUpdateTask.task_stop')
574+
@patch('xcvrd.xcvrd.DomInfoUpdateTask.start')
575+
@patch('xcvrd.xcvrd.SfpStateUpdateTask.start')
576+
@patch('xcvrd.xcvrd.DomInfoUpdateTask.join')
577+
@patch('xcvrd.xcvrd.SfpStateUpdateTask.join')
489578
def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_deinit, mock_init):
490579
mock_init.return_value = (PortMapping(), set())
491580
xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER)
@@ -501,7 +590,8 @@ def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1,
501590
@patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD'))
502591
def test_CmisManagerTask_handle_port_change_event(self):
503592
port_mapping = PortMapping()
504-
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
593+
stop_event = threading.Event()
594+
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
505595

506596
assert not task.isPortConfigDone
507597
port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET)
@@ -528,7 +618,8 @@ def test_CmisManagerTask_handle_port_change_event(self):
528618
@patch('xcvrd.xcvrd.XcvrTableHelper')
529619
def test_CmisManagerTask_get_configured_freq(self, mock_table_helper):
530620
port_mapping = PortMapping()
531-
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
621+
stop_event = threading.Event()
622+
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
532623
cfg_port_tbl = MagicMock()
533624
cfg_port_tbl.get = MagicMock(return_value=(True, (('laser_freq', 193100),)))
534625
mock_table_helper.get_cfg_port_tbl = MagicMock(return_value=cfg_port_tbl)
@@ -538,7 +629,8 @@ def test_CmisManagerTask_get_configured_freq(self, mock_table_helper):
538629
@patch('xcvrd.xcvrd.XcvrTableHelper')
539630
def test_CmisManagerTask_get_configured_tx_power_from_db(self, mock_table_helper):
540631
port_mapping = PortMapping()
541-
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
632+
stop_event = threading.Event()
633+
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
542634
cfg_port_tbl = MagicMock()
543635
cfg_port_tbl.get = MagicMock(return_value=(True, (('tx_power', -10),)))
544636
mock_table_helper.get_cfg_port_tbl = MagicMock(return_value=cfg_port_tbl)
@@ -554,11 +646,12 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis):
554646
mock_chassis.get_all_sfps = MagicMock(return_value=[mock_object, mock_object])
555647

556648
port_mapping = PortMapping()
557-
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
558-
task.wait_for_port_config_done = MagicMock()
559-
task.task_run()
560-
task.task_stop()
561-
assert task.task_process is None
649+
stop_event = threading.Event()
650+
cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
651+
cmis_manager.wait_for_port_config_done = MagicMock()
652+
cmis_manager.start()
653+
cmis_manager.join()
654+
assert not cmis_manager.is_alive()
562655

563656
@patch('xcvrd.xcvrd.platform_chassis')
564657
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None)))
@@ -657,7 +750,8 @@ def test_CmisManagerTask_task_worker(self, mock_chassis):
657750
mock_chassis.get_sfp = MagicMock(return_value=mock_sfp)
658751

659752
port_mapping = PortMapping()
660-
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping)
753+
stop_event = threading.Event()
754+
task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
661755

662756
port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET)
663757
task.on_port_update_event(port_change_event)
@@ -708,7 +802,8 @@ def test_CmisManagerTask_task_worker(self, mock_chassis):
708802
@patch('xcvrd.xcvrd.delete_port_from_status_table_hw')
709803
def test_DomInfoUpdateTask_handle_port_change_event(self, mock_del_status_tbl_hw):
710804
port_mapping = PortMapping()
711-
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping)
805+
stop_event = threading.Event()
806+
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
712807
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
713808
port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)
714809
task.on_port_config_change(port_change_event)
@@ -730,10 +825,11 @@ def test_DomInfoUpdateTask_handle_port_change_event(self, mock_del_status_tbl_hw
730825
@patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock())
731826
def test_DomInfoUpdateTask_task_run_stop(self):
732827
port_mapping = PortMapping()
733-
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping)
734-
task.task_run()
735-
task.task_stop()
736-
assert not task.task_thread.is_alive()
828+
stop_event = threading.Event()
829+
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
830+
task.start()
831+
task.join()
832+
assert not task.is_alive()
737833

738834
@patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock())
739835
@patch('xcvrd.xcvrd_utilities.sfp_status_helper.detect_port_in_error_status')
@@ -754,7 +850,8 @@ def test_DomInfoUpdateTask_task_worker(self, mock_post_pm_info, mock_update_stat
754850
mock_sub_table.return_value = mock_selectable
755851

756852
port_mapping = PortMapping()
757-
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping)
853+
stop_event = threading.Event()
854+
task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event)
758855
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
759856
task.task_stopping_event.wait = MagicMock(side_effect=[False, True])
760857
mock_detect_error.return_value = True
@@ -785,10 +882,11 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_update_status_hw
785882
mock_table_helper.get_int_tbl = MagicMock(return_value=mock_table)
786883
mock_table_helper.get_dom_tbl = MagicMock(return_value=mock_table)
787884
mock_table_helper.get_dom_threshold_tbl = MagicMock(return_value=mock_table)
788-
stopping_event = multiprocessing.Event()
885+
stop_event = threading.Event()
886+
sfp_error_event = threading.Event()
789887
port_mapping = PortMapping()
790888
retry_eeprom_set = set()
791-
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
889+
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
792890
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
793891
task.xcvr_table_helper.get_status_tbl = mock_table_helper.get_status_tbl
794892
task.xcvr_table_helper.get_intf_tbl = mock_table_helper.get_intf_tbl
@@ -821,15 +919,18 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_update_status_hw
821919
assert not task.port_mapping.logical_to_asic
822920
assert mock_update_status_hw.call_count == 1
823921

922+
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None)))
824923
def test_SfpStateUpdateTask_task_run_stop(self):
825924
port_mapping = PortMapping()
826925
retry_eeprom_set = set()
827-
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
828-
sfp_error_event = multiprocessing.Event()
829-
task.task_run(sfp_error_event)
830-
assert wait_until(5, 1, task.task_process.is_alive)
831-
task.task_stop()
832-
assert wait_until(5, 1, lambda: task.task_process.is_alive() is False)
926+
stop_event = threading.Event()
927+
sfp_error_event = threading.Event()
928+
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
929+
task.start()
930+
assert wait_until(5, 1, task.is_alive)
931+
task.raise_exception()
932+
task.join()
933+
assert wait_until(5, 1, lambda: task.is_alive() is False)
833934

834935
@patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock())
835936
@patch('xcvrd.xcvrd.post_port_sfp_info_to_db')
@@ -840,7 +941,9 @@ def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_update_status_hw, mo
840941

841942
port_mapping = PortMapping()
842943
retry_eeprom_set = set()
843-
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
944+
stop_event = threading.Event()
945+
sfp_error_event = threading.Event()
946+
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
844947
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
845948
task.xcvr_table_helper.get_intf_tbl = MagicMock(return_value=mock_table)
846949
task.xcvr_table_helper.get_dom_tbl = MagicMock(return_value=mock_table)
@@ -872,7 +975,9 @@ def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_update_status_hw, mo
872975
def test_SfpStateUpdateTask_mapping_event_from_change_event(self):
873976
port_mapping = PortMapping()
874977
retry_eeprom_set = set()
875-
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
978+
stop_event = threading.Event()
979+
sfp_error_event = threading.Event()
980+
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
876981
port_dict = {}
877982
assert task._mapping_event_from_change_event(False, port_dict) == SYSTEM_FAIL
878983
assert port_dict[EVENT_ON_ALL_SFP] == SYSTEM_FAIL
@@ -909,10 +1014,10 @@ def test_SfpStateUpdateTask_task_worker(self, mock_post_pm_info, mock_del_status
9091014
mock_del_dom, mock_change_event, mock_mapping_event, mock_os_kill):
9101015
port_mapping = PortMapping()
9111016
retry_eeprom_set = set()
912-
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
1017+
stop_event = threading.Event()
1018+
sfp_error_event = threading.Event()
1019+
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
9131020
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
914-
stop_event = multiprocessing.Event()
915-
sfp_error_event = multiprocessing.Event()
9161021
mock_change_event.return_value = (True, {0: 0}, {})
9171022
mock_mapping_event.return_value = SYSTEM_NOT_READY
9181023

@@ -1035,7 +1140,9 @@ class MockTable:
10351140

10361141
port_mapping = PortMapping()
10371142
retry_eeprom_set = set()
1038-
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set)
1143+
stop_event = threading.Event()
1144+
sfp_error_event = threading.Event()
1145+
task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event)
10391146
task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE)
10401147
task.xcvr_table_helper.get_status_tbl = mock_table_helper.get_status_tbl
10411148
task.xcvr_table_helper.get_intf_tbl = mock_table_helper.get_intf_tbl

0 commit comments

Comments
 (0)