28
28
import os
29
29
from functools import reduce
30
30
from .utils import extract_RJ45_ports_index
31
+ from . import module_host_mgmt_initializer
31
32
from . import utils
32
33
from .device_data import DeviceDataManager
33
34
import re
34
- import queue
35
+ import select
35
36
import threading
36
37
import time
37
- from sonic_platform import modules_mgmt
38
38
except ImportError as e :
39
39
raise ImportError (str (e ) + "- required module not found" )
40
40
@@ -132,9 +132,9 @@ def __init__(self):
132
132
133
133
Chassis .chassis_instance = self
134
134
135
- self .modules_mgmt_thread = threading . Thread ()
136
- self .modules_changes_queue = queue . Queue ()
137
- self .modules_mgmt_task_stopping_event = threading . Event ()
135
+ self .module_host_mgmt_initializer = module_host_mgmt_initializer . ModuleHostMgmtInitializer ()
136
+ self .poll_obj = None
137
+ self .registered_fds = None
138
138
139
139
logger .log_info ("Chassis loaded successfully" )
140
140
@@ -344,8 +344,11 @@ def get_all_sfps(self):
344
344
Returns:
345
345
A list of objects derived from SfpBase representing all sfps
346
346
available on this chassis
347
- """
348
- self .initialize_sfp ()
347
+ """
348
+ if DeviceDataManager .is_module_host_management_mode ():
349
+ self .module_host_mgmt_initializer .initialize (self )
350
+ else :
351
+ self .initialize_sfp ()
349
352
return self ._sfp_list
350
353
351
354
def get_sfp (self , index ):
@@ -362,7 +365,10 @@ def get_sfp(self, index):
362
365
An object dervied from SfpBase representing the specified sfp
363
366
"""
364
367
index = index - 1
365
- self .initialize_single_sfp (index )
368
+ if DeviceDataManager .is_module_host_management_mode ():
369
+ self .module_host_mgmt_initializer .initialize (self )
370
+ else :
371
+ self .initialize_single_sfp (index )
366
372
return super (Chassis , self ).get_sfp (index )
367
373
368
374
def get_port_or_cage_type (self , index ):
@@ -412,42 +418,223 @@ def get_change_event(self, timeout=0):
412
418
indicates that fan 0 has been removed, fan 2
413
419
has been inserted and sfp 11 has been removed.
414
420
"""
415
- if not self .modules_mgmt_thread .is_alive ():
416
- # open new SFP change events thread
417
- self .modules_mgmt_thread = modules_mgmt .ModulesMgmtTask (q = self .modules_changes_queue
418
- , main_thread_stop_event = self .modules_mgmt_task_stopping_event )
419
- # Set the thread as daemon so when pmon/xcvrd are shutting down, modules_mgmt will shut down immedietly.
420
- self .modules_mgmt_thread .daemon = True
421
- self .modules_mgmt_thread .start ()
422
- self .initialize_sfp ()
423
- wait_for_ever = (timeout == 0 )
421
+ if DeviceDataManager .is_module_host_management_mode ():
422
+ self .module_host_mgmt_initializer .initialize (self )
423
+ return self .get_change_event_for_module_host_management_mode (timeout )
424
+ else :
425
+ self .initialize_sfp ()
426
+ return self .get_change_event_legacy (timeout )
427
+
428
+ def get_change_event_for_module_host_management_mode (self , timeout ):
429
+ """Get SFP change event when module host management mode is enabled.
430
+
431
+ Args:
432
+ timeout: Timeout in milliseconds (optional). If timeout == 0,
433
+ this method will block until a change is detected.
434
+
435
+ Returns:
436
+ (bool, dict):
437
+ - True if call successful, False if not; - Deprecated, will always return True
438
+ - A nested dictionary where key is a device type,
439
+ value is a dictionary with key:value pairs in the format of
440
+ {'device_id':'device_event'},
441
+ where device_id is the device ID for this device and
442
+ device_event,
443
+ status='1' represents device inserted,
444
+ status='0' represents device removed.
445
+ Ex. {'fan':{'0':'0', '2':'1'}, 'sfp':{'11':'0'}}
446
+ indicates that fan 0 has been removed, fan 2
447
+ has been inserted and sfp 11 has been removed.
448
+ """
449
+ if not self .poll_obj :
450
+ self .poll_obj = select .poll ()
451
+ self .registered_fds = {}
452
+ for s in self ._sfp_list :
453
+ fds = s .get_fds_for_poling ()
454
+ for fd_type , fd in fds .items ():
455
+ self .poll_obj .register (fd , select .POLLERR | select .POLLPRI )
456
+ self .registered_fds [fd .fileno ()] = (s .sdk_index , fd , fd_type )
457
+
458
+ logger .log_debug (f'Registered SFP file descriptors for polling: { self .registered_fds } ' )
459
+
460
+ from . import sfp
461
+
462
+ wait_forever = (timeout == 0 )
463
+ # poll timeout should be no more than 1000ms to ensure fast shutdown flow
464
+ timeout = 1000.0 if timeout >= 1000 else float (timeout )
465
+ port_dict = {}
466
+ error_dict = {}
467
+ begin = time .time ()
468
+ wait_ready_task = sfp .SFP .get_wait_ready_task ()
469
+
470
+ while True :
471
+ fds_events = self .poll_obj .poll (timeout )
472
+ for fileno , _ in fds_events :
473
+ if fileno not in self .registered_fds :
474
+ logger .log_error (f'Unknown file no { fileno } from poll event, registered files are { self .registered_fds } ' )
475
+ continue
476
+
477
+ sfp_index , fd , fd_type = self .registered_fds [fileno ]
478
+ s = self ._sfp_list [sfp_index ]
479
+ fd .seek (0 )
480
+ fd_value = int (fd .read ().strip ())
481
+
482
+ # Detecting dummy event
483
+ if s .is_dummy_event (fd_type , fd_value ):
484
+ # Ignore dummy event for the first poll, assume SDK only provide 1 dummy event
485
+ logger .log_debug (f'Ignore dummy event { fd_type } :{ fd_value } for SFP { sfp_index } ' )
486
+ continue
487
+
488
+ logger .log_notice (f'Got SFP event: index={ sfp_index } , type={ fd_type } , value={ fd_value } ' )
489
+ if fd_type == 'hw_present' :
490
+ # event could be EVENT_NOT_PRESENT or EVENT_PRESENT
491
+ event = sfp .EVENT_NOT_PRESENT if fd_value == 0 else sfp .EVENT_PRESENT
492
+ s .on_event (event )
493
+ elif fd_type == 'present' :
494
+ if str (fd_value ) == sfp .SFP_STATUS_ERROR :
495
+ # FW control cable got an error, no need trigger state machine
496
+ sfp_status , error_desc = s .get_error_info_from_sdk_error_type ()
497
+ port_dict [sfp_index + 1 ] = sfp_status
498
+ if error_desc :
499
+ error_dict [sfp_index + 1 ] = error_desc
500
+ continue
501
+ elif str (fd_value ) == sfp .SFP_STATUS_INSERTED :
502
+ # FW control cable got present, only case is that the cable is recovering
503
+ # from an error. FW control cable has no transition from "Not Present" to "Present"
504
+ # because "Not Present" cable is always "software control" and should always poll
505
+ # hw_present sysfs instead of present sysfs.
506
+ port_dict [sfp_index + 1 ] = sfp .SFP_STATUS_INSERTED
507
+ continue
508
+ else :
509
+ s .on_event (sfp .EVENT_NOT_PRESENT )
510
+ else :
511
+ # event could be EVENT_POWER_GOOD or EVENT_POWER_BAD
512
+ event = sfp .EVENT_POWER_BAD if fd_value == 0 else sfp .EVENT_POWER_GOOD
513
+ s .on_event (event )
514
+
515
+ if s .in_stable_state ():
516
+ s .fill_change_event (port_dict )
517
+ s .refresh_poll_obj (self .poll_obj , self .registered_fds )
518
+ else :
519
+ logger .log_debug (f'SFP { sfp_index } does not reach stable state, state={ s .state } ' )
520
+
521
+ ready_sfp_set = wait_ready_task .get_ready_set ()
522
+ for sfp_index in ready_sfp_set :
523
+ s = self ._sfp_list [sfp_index ]
524
+ s .on_event (sfp .EVENT_RESET_DONE )
525
+ if s .in_stable_state ():
526
+ s .fill_change_event (port_dict )
527
+ s .refresh_poll_obj (self .poll_obj , self .registered_fds )
528
+ else :
529
+ logger .log_error (f'SFP { sfp_index } failed to reach stable state, state={ s .state } ' )
530
+
531
+ if port_dict :
532
+ logger .log_notice (f'Sending SFP change event: { port_dict } , error event: { error_dict } ' )
533
+ self .reinit_sfps (port_dict )
534
+ return True , {
535
+ 'sfp' : port_dict ,
536
+ 'sfp_error' : error_dict
537
+ }
538
+ else :
539
+ if not wait_forever :
540
+ elapse = time .time () - begin
541
+ if elapse * 1000 >= timeout :
542
+ return True , {'sfp' : {}}
543
+
544
+ def get_change_event_legacy (self , timeout ):
545
+ """Get SFP change event when module host management is disabled.
546
+
547
+ Args:
548
+ timeout (int): polling timeout in ms
549
+
550
+ Returns:
551
+ (bool, dict):
552
+ - True if call successful, False if not; - Deprecated, will always return True
553
+ - A nested dictionary where key is a device type,
554
+ value is a dictionary with key:value pairs in the format of
555
+ {'device_id':'device_event'},
556
+ where device_id is the device ID for this device and
557
+ device_event,
558
+ status='1' represents device inserted,
559
+ status='0' represents device removed.
560
+ Ex. {'fan':{'0':'0', '2':'1'}, 'sfp':{'11':'0'}}
561
+ indicates that fan 0 has been removed, fan 2
562
+ has been inserted and sfp 11 has been removed.
563
+ """
564
+ if not self .poll_obj :
565
+ self .poll_obj = select .poll ()
566
+ self .registered_fds = {}
567
+ # SDK always sent event for the first time polling. Such event should not be sent to xcvrd.
568
+ # Store SFP state before first time polling so that we can detect dummy event.
569
+ self .sfp_states_before_first_poll = {}
570
+ for s in self ._sfp_list :
571
+ fd = s .get_fd_for_polling_legacy ()
572
+ self .poll_obj .register (fd , select .POLLERR | select .POLLPRI )
573
+ self .registered_fds [fd .fileno ()] = (s .sdk_index , fd )
574
+ self .sfp_states_before_first_poll [s .sdk_index ] = s .get_module_status ()
575
+
576
+ logger .log_debug (f'Registered SFP file descriptors for polling: { self .registered_fds } ' )
577
+
578
+ from . import sfp
579
+
580
+ wait_forever = (timeout == 0 )
424
581
# poll timeout should be no more than 1000ms to ensure fast shutdown flow
425
582
timeout = 1000.0 if timeout >= 1000 else float (timeout )
426
583
port_dict = {}
427
584
error_dict = {}
428
585
begin = time .time ()
429
- i = 0
586
+
430
587
while True :
431
- try :
432
- logger .log_info (f'get_change_event() trying to get changes from queue on iteration { i } ' )
433
- port_dict = self .modules_changes_queue .get (timeout = timeout / 1000 )
434
- logger .log_info (f'get_change_event() iteration { i } port_dict: { port_dict } ' )
435
- except queue .Empty :
436
- logger .log_info (f"failed to get item from modules changes queue on itertaion { i } " )
588
+ fds_events = self .poll_obj .poll (timeout )
589
+ for fileno , _ in fds_events :
590
+ if fileno not in self .registered_fds :
591
+ logger .log_error (f'Unknown file no { fileno } from poll event, registered files are { self .registered_fds } ' )
592
+ continue
593
+
594
+ sfp_index , fd = self .registered_fds [fileno ]
595
+ fd .seek (0 )
596
+ fd .read ()
597
+ s = self ._sfp_list [sfp_index ]
598
+ sfp_status = s .get_module_status ()
599
+
600
+ if sfp_index in self .sfp_states_before_first_poll :
601
+ # Detecting dummy event
602
+ sfp_state_before_poll = self .sfp_states_before_first_poll [sfp_index ]
603
+ self .sfp_states_before_first_poll .pop (sfp_index )
604
+ if sfp_state_before_poll == sfp_status :
605
+ # Ignore dummy event for the first poll, assume SDK only provide 1 dummy event
606
+ logger .log_debug (f'Ignore dummy event { sfp_status } for SFP { sfp_index } ' )
607
+ continue
608
+
609
+ logger .log_notice (f'Got SFP event: index={ sfp_index } , value={ sfp_status } ' )
610
+ if sfp_status == sfp .SFP_STATUS_UNKNOWN :
611
+ # in the following sequence, STATUS_UNKNOWN can be returned.
612
+ # so we shouldn't raise exception here.
613
+ # 1. some sfp module is inserted
614
+ # 2. sfp_event gets stuck and fails to fetch the change event instantaneously
615
+ # 3. and then the sfp module is removed
616
+ # 4. sfp_event starts to try fetching the change event
617
+ logger .log_notice ("unknown module state, maybe the port suffers two adjacent insertion/removal" )
618
+ continue
619
+
620
+ if sfp_status == sfp .SFP_STATUS_ERROR :
621
+ sfp_status , error_desc = s .get_error_info_from_sdk_error_type ()
622
+ if error_desc :
623
+ error_dict [sfp_index + 1 ] = error_desc
624
+ port_dict [sfp_index + 1 ] = sfp_status
437
625
438
626
if port_dict :
627
+ logger .log_notice (f'Sending SFP change event: { port_dict } , error event: { error_dict } ' )
439
628
self .reinit_sfps (port_dict )
440
- result_dict = {'sfp' : port_dict }
441
- result_dict ['sfp_error' ] = error_dict
442
- return True , result_dict
629
+ return True , {
630
+ 'sfp' : port_dict ,
631
+ 'sfp_error' : error_dict
632
+ }
443
633
else :
444
- if not wait_for_ever :
634
+ if not wait_forever :
445
635
elapse = time .time () - begin
446
- logger .log_info (f"get_change_event: wait_for_ever { wait_for_ever } elapse { elapse } iteartion { i } " )
447
636
if elapse * 1000 >= timeout :
448
- logger .log_info (f"elapse { elapse } > timeout { timeout } iteartion { i } returning empty dict" )
449
637
return True , {'sfp' : {}}
450
- i += 1
451
638
452
639
def reinit_sfps (self , port_dict ):
453
640
"""
@@ -457,7 +644,7 @@ def reinit_sfps(self, port_dict):
457
644
"""
458
645
from . import sfp
459
646
for index , status in port_dict .items ():
460
- if status == sfp .SFP_STATUS_INSERTED :
647
+ if status == sfp .SFP_STATUS_REMOVED :
461
648
try :
462
649
self ._sfp_list [int (index ) - 1 ].reinit ()
463
650
except Exception as e :
0 commit comments