Skip to content

Commit 9ec1636

Browse files
committed
Fixed xcvrd shutdown flow.
Signed-off-by: Nazarii Hnydyn <[email protected]>
1 parent e5d8155 commit 9ec1636

File tree

1 file changed

+170
-89
lines changed

1 file changed

+170
-89
lines changed

sonic-xcvrd/scripts/xcvrd

+170-89
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ try:
1515
import syslog
1616
import time
1717
import threading
18+
import multiprocessing
1819
from swsscommon import swsscommon
1920
except ImportError, e:
2021
raise ImportError (str(e) + " - required module not found")
@@ -61,6 +62,9 @@ VOLT_UNIT = 'Volts'
6162
POWER_UNIT = 'dBm'
6263
BIAS_UNIT = 'mA'
6364

65+
TIMEOUT_SECS = 1
66+
RUN = True
67+
6468
#========================== Syslog wrappers ==========================
6569

6670
def log_info(msg, also_print_to_console=False):
@@ -90,19 +94,17 @@ def log_error(msg, also_print_to_console=False):
9094
#========================== Signal Handling ==========================
9195

9296
def signal_handler(sig, frame):
97+
global RUN
9398
if sig == signal.SIGHUP:
9499
log_info("Caught SIGHUP - ignoring...")
95-
return
96100
elif sig == signal.SIGINT:
97101
log_info("Caught SIGINT - exiting...")
98-
sys.exit(128 + sig)
102+
RUN = False
99103
elif sig == signal.SIGTERM:
100104
log_info("Caught SIGTERM - exiting...")
101-
sys.exit(128 + sig)
105+
RUN = False
102106
else:
103107
log_warning("Caught unhandled signal '" + sig + "'")
104-
return
105-
106108

107109
#============ Functions to load platform-specific classes ============
108110

@@ -218,7 +220,7 @@ def beautify_dom_info_dict(dom_info_dict):
218220
dom_info_dict['tx3power'] = strip_unit_and_beautify(dom_info_dict['tx3power'], POWER_UNIT)
219221
dom_info_dict['tx4power'] = strip_unit_and_beautify(dom_info_dict['tx4power'], POWER_UNIT)
220222

221-
# update all the sfp info to db
223+
# update port sfp info in db
222224
def post_port_sfp_info_to_db(logical_port_name, table):
223225
ganged_port = False
224226
ganged_member_num = 1
@@ -232,6 +234,9 @@ def post_port_sfp_info_to_db(logical_port_name, table):
232234
ganged_port = True
233235

234236
for physical_port in physical_port_list:
237+
if not RUN:
238+
break
239+
235240
if not platform_sfputil.get_presence(physical_port):
236241
continue
237242

@@ -254,7 +259,7 @@ def post_port_sfp_info_to_db(logical_port_name, table):
254259
log_error("This functionality is currently not implemented for this platform")
255260
sys.exit(3)
256261

257-
# update dom sensor info to db
262+
# update port dom sensor info in db
258263
def post_port_dom_info_to_db(logical_port_name, table):
259264
ganged_port = False
260265
ganged_member_num = 1
@@ -268,6 +273,9 @@ def post_port_dom_info_to_db(logical_port_name, table):
268273
ganged_port = True
269274

270275
for physical_port in physical_port_list:
276+
if not RUN:
277+
break
278+
271279
if not platform_sfputil.get_presence(physical_port):
272280
continue
273281

@@ -300,8 +308,24 @@ def post_port_dom_info_to_db(logical_port_name, table):
300308
log_error("This functionality is currently not implemented for this platform")
301309
sys.exit(3)
302310

303-
# del sfp and dom info from db
304-
def del_port_sfp_dom_info_to_db(logical_port_name, int_tbl, dom_tbl):
311+
# update port dom/sfp info in db
312+
def post_port_sfp_dom_info_to_db():
313+
# Connect to STATE_DB and create transceiver dom/sfp info tables
314+
state_db = swsscommon.DBConnector(swsscommon.STATE_DB,
315+
REDIS_HOSTNAME,
316+
REDIS_PORT,
317+
REDIS_TIMEOUT_MSECS)
318+
int_tbl = swsscommon.Table(state_db, "TRANSCEIVER_INFO")
319+
dom_tbl = swsscommon.Table(state_db, "TRANSCEIVER_DOM_SENSOR")
320+
321+
# Post all the current interface dom/sfp info to STATE_DB
322+
logical_port_list = platform_sfputil.logical
323+
for logical_port_name in logical_port_list:
324+
post_port_sfp_info_to_db(logical_port_name, int_tbl)
325+
post_port_dom_info_to_db(logical_port_name, dom_tbl)
326+
327+
# delete port dom/sfp info from db
328+
def del_port_sfp_dom_info_from_db(logical_port_name, int_tbl, dom_tbl):
305329
ganged_port = False
306330
ganged_member_num = 1
307331

@@ -325,28 +349,130 @@ def del_port_sfp_dom_info_to_db(logical_port_name, int_tbl, dom_tbl):
325349
log_error("This functionality is currently not implemented for this platform")
326350
sys.exit(3)
327351

328-
# Timer thread wrapper class to update dom info to DB periodically
352+
# wait for port config is done
353+
def wait_for_port_config_done():
354+
# Connect to APPL_DB abd subscribe to PORT table notifications
355+
appl_db = swsscommon.DBConnector(swsscommon.APPL_DB,
356+
REDIS_HOSTNAME,
357+
REDIS_PORT,
358+
REDIS_TIMEOUT_MSECS)
359+
360+
sel = swsscommon.Select()
361+
sst = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME)
362+
sel.addSelectable(sst)
363+
364+
# Make sure this daemon started after all port configured
365+
while RUN:
366+
(state, c) = sel.select(SELECT_TIMEOUT_MSECS)
367+
if state == swsscommon.Select.TIMEOUT:
368+
continue
369+
if state != swsscommon.Select.OBJECT:
370+
log_warning("sel.select() did not return swsscommon.Select.OBJECT")
371+
continue
372+
373+
(key, op, fvp) = sst.pop()
374+
if key in ["PortConfigDone", "PortInitDone"]:
375+
break
376+
377+
# Thread wrapper class to update dom info periodically
329378
class dom_info_update_task:
330-
def __init__(self, table):
379+
def __init__(self):
380+
self.task_thread = None
331381
self.task_stopping_event = threading.Event()
332-
self.task_timer = None
333-
self.dom_table = table
334382

335-
def task_run(self):
336-
if self.task_stopping_event.isSet():
337-
log_error("Error: dom info update thread received stop event, exiting...")
338-
return
383+
def task_worker(self):
384+
log_info("Start DOM monitoring loop")
339385

386+
# Connect to STATE_DB and create transceiver dom/sfp info tables
387+
state_db = swsscommon.DBConnector(swsscommon.STATE_DB,
388+
REDIS_HOSTNAME,
389+
REDIS_PORT,
390+
REDIS_TIMEOUT_MSECS)
391+
int_tbl = swsscommon.Table(state_db, "TRANSCEIVER_INFO")
392+
dom_tbl = swsscommon.Table(state_db, "TRANSCEIVER_DOM_SENSOR")
393+
394+
# Start loop to update dom info in DB periodically
395+
while not self.task_stopping_event.wait(DOM_INFO_UPDATE_PERIOD_SECS):
396+
logical_port_list = platform_sfputil.logical
397+
for logical_port_name in logical_port_list:
398+
post_port_dom_info_to_db(logical_port_name, dom_tbl)
399+
400+
# Delete all the information from DB and then exit
340401
logical_port_list = platform_sfputil.logical
341402
for logical_port_name in logical_port_list:
342-
post_port_dom_info_to_db(logical_port_name, self.dom_table)
403+
del_port_sfp_dom_info_from_db(logical_port_name, int_tbl, dom_tbl)
404+
405+
log_info("Stop DOM monitoring loop")
343406

344-
self.task_timer = threading.Timer(DOM_INFO_UPDATE_PERIOD_SECS, self.task_run)
345-
self.task_timer.start()
407+
def task_run(self):
408+
if self.task_stopping_event.is_set():
409+
return
410+
411+
self.task_thread = threading.Thread(target=self.task_worker)
412+
self.task_thread.start()
413+
414+
def task_stop(self):
415+
self.task_stopping_event.set()
416+
self.task_thread.join()
417+
418+
# Process wrapper class to update sfp state info periodically
419+
class sfp_state_update_task:
420+
def __init__(self):
421+
self.task_process = None
422+
self.task_stopping_event = multiprocessing.Event()
423+
424+
def task_worker(self, stopping_event):
425+
log_info("Start SFP monitoring loop")
426+
427+
# Connect to STATE_DB and create transceiver dom/sfp info tables
428+
state_db = swsscommon.DBConnector(swsscommon.STATE_DB,
429+
REDIS_HOSTNAME,
430+
REDIS_PORT,
431+
REDIS_TIMEOUT_MSECS)
432+
int_tbl = swsscommon.Table(state_db, "TRANSCEIVER_INFO")
433+
dom_tbl = swsscommon.Table(state_db, "TRANSCEIVER_DOM_SENSOR")
434+
435+
# Start loop to listen to the sfp change event
436+
while not stopping_event.is_set():
437+
status, port_dict = platform_sfputil.get_transceiver_change_event()
438+
if status:
439+
for key, value in port_dict.iteritems():
440+
logical_port_list = platform_sfputil.get_physical_to_logical(int(key))
441+
for logical_port in logical_port_list:
442+
if value == SFP_STATUS_INSERTED:
443+
log_info("Got SFP inserted event")
444+
rc = post_port_sfp_info_to_db(logical_port, int_tbl)
445+
# If we didn't get the sfp info, assuming the eeprom is not ready, give a try again.
446+
if rc == SFP_EEPROM_NOT_READY:
447+
log_warning("SFP EEPROM is not ready. One more try...")
448+
time.sleep(TIME_FOR_SFP_READY_SECS)
449+
post_port_sfp_info_to_db(logical_port, int_tbl)
450+
post_port_dom_info_to_db(logical_port, dom_tbl)
451+
elif value == SFP_STATUS_REMOVED:
452+
log_info("Got SFP removed event")
453+
del_port_sfp_dom_info_to_db(logical_port, int_tbl, dom_tbl)
454+
else:
455+
# TODO, SFP return error code, need handle accordingly.
456+
continue
457+
else:
458+
# If get_transceiver_change_event() return error, will clean up the DB and then exit
459+
# TODO: next step need to define more error types to handle accordingly.
460+
log_error("Method get_transceiver_change_event() returned error. Exiting...")
461+
os.kill(os.getppid(), signal.SIGTERM)
462+
break
463+
464+
log_info("Stop SFP monitoring loop")
465+
466+
def task_run(self):
467+
if self.task_stopping_event.is_set():
468+
return
469+
470+
self.task_process = multiprocessing.Process(target=self.task_worker,args=(self.task_stopping_event,))
471+
self.task_process.start()
346472

347473
def task_stop(self):
348474
self.task_stopping_event.set()
349-
self.task_timer.join()
475+
os.kill(self.task_process.pid, signal.SIGKILL)
350476

351477
#=============================== Main ================================
352478

@@ -361,7 +487,7 @@ def main():
361487
# Load platform-specific sfputil class
362488
err = load_platform_sfputil()
363489
if err != 0:
364-
log_error("failed to load sfputil")
490+
log_error("Failed to load sfputil", True)
365491
sys.exit(1)
366492

367493
# Load port info
@@ -373,82 +499,37 @@ def main():
373499
log_error("Error reading port info (%s)" % str(e), True)
374500
sys.exit(2)
375501

376-
# Connect to STATE_DB and create transceiver info/dom info table
377-
state_db = swsscommon.DBConnector(swsscommon.STATE_DB,
378-
REDIS_HOSTNAME,
379-
REDIS_PORT,
380-
REDIS_TIMEOUT_MSECS)
381-
int_tbl = swsscommon.Table(state_db, "TRANSCEIVER_INFO")
382-
dom_tbl = swsscommon.Table(state_db, "TRANSCEIVER_DOM_SENSOR")
502+
# Make sure this daemon started after all port configured
503+
log_info("Wait for port config is done")
504+
wait_for_port_config_done()
383505

384-
# Connect to APPL_DB abd subscribe to PORT table notifications
385-
appl_db = swsscommon.DBConnector(swsscommon.APPL_DB,
386-
REDIS_HOSTNAME,
387-
REDIS_PORT,
388-
REDIS_TIMEOUT_MSECS)
506+
# Post all the current interface dom/sfp info to STATE_DB
507+
log_info("Post all port DOM/SFP info to DB")
508+
post_port_sfp_dom_info_to_db()
389509

390-
sel = swsscommon.Select()
391-
sst = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME)
392-
sel.addSelectable(sst)
393-
394-
# Make sure this daemon started after all port configured.
395-
while True:
396-
(state, c) = sel.select(SELECT_TIMEOUT_MSECS)
397-
if state == swsscommon.Select.TIMEOUT:
398-
continue
399-
if state != swsscommon.Select.OBJECT:
400-
log_warning("sel.select() did not return swsscommon.Select.OBJECT")
401-
continue
510+
# Start the dom sensor info update thread
511+
dom_info_update = dom_info_update_task()
512+
dom_info_update.task_run()
402513

403-
(key, op, fvp) = sst.pop()
404-
if key in ["PortConfigDone", "PortInitDone"]:
405-
break
514+
# Start the sfp state info update process
515+
sfp_state_update = sfp_state_update_task()
516+
sfp_state_update.task_run()
406517

407-
# Post all the current interface SFP info to STATE_DB
408-
logical_port_list = platform_sfputil.logical
409-
for logical_port_name in logical_port_list:
410-
post_port_sfp_info_to_db(logical_port_name, int_tbl)
411-
post_port_dom_info_to_db(logical_port_name, dom_tbl)
518+
# Start main loop
519+
log_info("Start daemon main loop")
412520

413-
# Start the dom sensor info update timer thread
414-
dom_info_update = dom_info_update_task(dom_tbl)
415-
dom_info_update.task_run()
521+
while RUN:
522+
time.sleep(TIMEOUT_SECS)
416523

417-
# Start main loop to listen to the SFP change event.
418-
log_info("Start main loop")
419-
while True:
420-
status, port_dict = platform_sfputil.get_transceiver_change_event()
421-
if status:
422-
for key, value in port_dict.iteritems():
423-
logical_port_list = platform_sfputil.get_physical_to_logical(int(key))
424-
for logical_port in logical_port_list:
425-
if value == SFP_STATUS_INSERTED:
426-
rc = post_port_sfp_info_to_db(logical_port, int_tbl)
427-
# If we didn't get the sfp info, assuming the eeprom is not ready, give a try again.
428-
if rc == SFP_EEPROM_NOT_READY:
429-
time.sleep(TIME_FOR_SFP_READY_SECS)
430-
post_port_sfp_info_to_db(logical_port, int_tbl)
431-
post_port_dom_info_to_db(logical_port, dom_tbl)
432-
433-
elif value == SFP_STATUS_REMOVED:
434-
del_port_sfp_dom_info_to_db(logical_port, int_tbl, dom_tbl)
435-
else:
436-
# TODO, SFP return error code, need handle accordingly.
437-
continue
438-
else:
439-
# If get_transceiver_change_event() return error, will clean up the DB and then exit
440-
# TODO: next step need to define more error types to handle accordingly.
441-
break
524+
log_info("Stop daemon main loop")
442525

443-
# Stop the dom info update timer
526+
# Stop the dom sensor info update thread
444527
dom_info_update.task_stop()
445528

446-
# Clean all the information from DB and then exit
447-
logical_port_list = platform_sfputil.logical
448-
for logical_port_name in logical_port_list:
449-
del_port_sfp_dom_info_to_db(logical_port_name, int_tbl, dom_tbl)
450-
log_error("Error: return error from get_transceiver_change_event(), exiting...")
451-
return 1
529+
# Stop the sfp state info update process
530+
sfp_state_update.task_stop()
531+
532+
log_info("Shutting down...")
452533

453534
if __name__ == '__main__':
454535
main()

0 commit comments

Comments
 (0)