1
1
import os
2
2
import sys
3
+ import time
3
4
4
5
import unittest
5
6
if sys .version_info >= (3 , 3 ):
29
30
from xcvrd .xcvrd_utilities .y_cable_helper import *
30
31
from xcvrd .xcvrd_utilities .sfp_status_helper import *
31
32
32
-
33
33
class TestXcvrdScript (object ):
34
34
35
35
def test_xcvrd_helper_class_run (self ):
@@ -335,3 +335,28 @@ def test_is_error_sfp_status(self):
335
335
336
336
assert not is_error_block_eeprom_reading (int (SFP_STATUS_INSERTED ))
337
337
assert not is_error_block_eeprom_reading (int (SFP_STATUS_REMOVED ))
338
+
339
+ def test_sfp_insert_events (self ):
340
+ from xcvrd .xcvrd import _wrapper_soak_sfp_insert_event
341
+ sfp_insert_events = {}
342
+ insert = port_dict = {1 :'1' , 2 :'1' , 3 :'1' , 4 :'1' , 5 :'1' }
343
+ start = time .time ()
344
+ while True :
345
+ _wrapper_soak_sfp_insert_event (sfp_insert_events , insert )
346
+ assert not bool (insert )
347
+ if time .time () - start > MGMT_INIT_TIME_DELAY_SECS :
348
+ break
349
+ assert insert == port_dict
350
+
351
+ def test_sfp_remove_events (self ):
352
+ from xcvrd .xcvrd import _wrapper_soak_sfp_insert_event
353
+ sfp_insert_events = {}
354
+ insert = {1 :'1' , 2 :'1' , 3 :'1' , 4 :'1' , 5 :'1' }
355
+ removal = {1 :'0' , 2 :'0' , 3 :'0' , 4 :'0' , 5 :'0' }
356
+ port_dict = {1 :'0' , 2 :'0' , 3 :'0' , 4 :'0' , 5 :'0' }
357
+ for x in range (5 ):
358
+ _wrapper_soak_sfp_insert_event (sfp_insert_events , insert )
359
+ time .sleep (1 )
360
+ _wrapper_soak_sfp_insert_event (sfp_insert_events , removal )
361
+
362
+ assert port_dict == removal
0 commit comments