Skip to content

Commit d6b2a02

Browse files
authored
add async notification support in active-active topo; refactor code for ycable tasks for change events (sonic-net#327)
Signed-off-by: vaibhav-dahiya [email protected] This PR adds an enhancement to ycabled when if and whenever a notification arrives from SoC or server that it is going to be serviced. The notification can come at anytime and hence it must be listened to or awaited by the client at all times. For this we use asyncio lib and avail the async functionality in python. the added code defines a GracefulRestartClient class that can be used to send requests to a gRPC server for graceful restart. The class has three async methods: send_request, receive_response, and notify_graceful_restart_start. send_request method retrieves tor from request_queue, creates a request with the ToR, and sends it to the server using the stub's NotifyGracefulRestartStart method. It then reads the response from the server and prints the details of the response. receive_response method retrieves response from response_queue, prints/puts the the response in DB, sleeps for the period mentioned in the response, and then puts the tor of the response back in the request_queue We follow the existing ycabled pattern and instantiate the task_worker which contains all these tasks in a seperate thread Description Motivation and Context How Has This Been Tested? UT and using this server to validate
1 parent 6202a95 commit d6b2a02

File tree

6 files changed

+258
-1
lines changed

6 files changed

+258
-1
lines changed

sonic-ycabled/proto/proto_out/linkmgr_grpc_driver.proto

+32
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,35 @@ message ServerVersionReply {
4545
}
4646

4747

48+
service GracefulRestart {
49+
50+
rpc NotifyGracefulRestartStart(GracefulAdminRequest) returns (stream GracefulAdminResponse) {}
51+
52+
}
53+
54+
enum ToRSide {
55+
LOWER_TOR = 0;
56+
UPPER_TOR =1;
57+
}
58+
59+
message GracefulAdminRequest {
60+
ToRSide tor = 1;
61+
}
62+
63+
enum GracefulRestartMsgType {
64+
SERVICE_BEGIN = 0;
65+
SERVICE_END = 1;// send this always from SoC Side even if not paired with Begin
66+
}
67+
68+
enum GracefulRestartNotifyType {
69+
CONTROL_PLANE = 0;// need proper definitions
70+
DATA_PLANE = 1;
71+
BOTH = 2;
72+
}
73+
74+
message GracefulAdminResponse {
75+
GracefulRestartMsgType msgtype = 1;
76+
GracefulRestartNotifyType notifytype = 2;
77+
string guid = 3;
78+
int32 period = 4; // in seconds
79+
}

sonic-ycabled/tests/test_y_cable_helper.py

+16-1
Original file line numberDiff line numberDiff line change
@@ -2676,7 +2676,6 @@ def test_get_muxcable_static_info_read_side_peer_exceptions(self, platform_sfput
26762676

26772677
status = True
26782678
fvs = [('state', "auto"), ('read_side', 2)]
2679-
26802679
y_cable_tbl[asic_index] = swsscommon.Table(
26812680
test_db[asic_index], "Y_CABLE_TABLE")
26822681
y_cable_tbl[asic_index].get.return_value = (status, fvs)
@@ -7120,3 +7119,19 @@ def test_get_muxcable_info_for_active_active(self):
71207119

71217120

71227121

7122+
@patch("grpc.aio.secure_channel")
7123+
@patch("proto_out.linkmgr_grpc_driver_pb2_grpc.GracefulRestartStub")
7124+
def test_ycable_graceful_client(self, channel, stub):
7125+
7126+
7127+
mock_channel = MagicMock()
7128+
channel.return_value = mock_channel
7129+
7130+
mock_stub = MagicMock()
7131+
mock_stub.NotifyGracefulRestartStart = MagicMock(return_value=[4, 5])
7132+
stub.return_value = mock_stub
7133+
7134+
7135+
read_side = 1
7136+
Y_cable_restart_client = GracefulRestartClient("Ethernet48", None, read_side)
7137+

sonic-ycabled/tests/test_ycable.py

+32
Original file line numberDiff line numberDiff line change
@@ -367,4 +367,36 @@ def test_ycable_helper_class_run_loop_with_exception(self):
367367
assert("sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py" in str(trace))
368368
assert("swsscommon.Select" in str(trace))
369369

370+
class TestYcableAsyncScript(object):
371+
372+
@patch("swsscommon.swsscommon.Select", MagicMock(side_effect=NotImplementedError))
373+
@patch("swsscommon.swsscommon.Select.addSelectable", MagicMock(side_effect=NotImplementedError))
374+
@patch("swsscommon.swsscommon.Select.select", MagicMock(side_effect=NotImplementedError))
375+
@patch("swsscommon.swsscommon.Table.get", MagicMock(
376+
return_value=[(True, (('state', "auto"), ("soc_ipv4", "192.168.0.1/32"))), (True, (('index', 2), ))]))
377+
@patch("ycable.ycable_utilities.y_cable_helper.setup_grpc_channel_for_port", MagicMock(side_effect=NotImplementedError))
378+
@patch("ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil")
379+
def test_ycable_helper_async_client_run_loop_with_exception(self, sfputil):
380+
381+
382+
sfputil.logical = ["Ethernet0", "Ethernet4"]
383+
sfputil.get_asic_id_for_logical_port = MagicMock(return_value=0)
384+
Y_cable_async_task = YCableAsyncNotificationTask()
385+
expected_exception_start = None
386+
expected_exception_join = None
387+
trace = None
388+
try:
389+
Y_cable_async_task.start()
390+
Y_cable_async_task.task_worker()
391+
Y_cable_async_task.join()
392+
except Exception as e1:
393+
expected_exception_start = e1
394+
trace = traceback.format_exc()
395+
396+
397+
398+
399+
assert("NotImplementedError" in str(trace) and "effect" in str(trace))
400+
assert("sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py" in str(trace))
401+
assert("setup_grpc_channel_for_port" in str(trace))
370402

sonic-ycabled/ycable/ycable.py

+3
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,9 @@ def run(self):
389389
y_cable_cli_worker_update = y_cable_helper.YCableCliUpdateTask()
390390
y_cable_cli_worker_update.start()
391391
self.threads.append(y_cable_cli_worker_update)
392+
y_cable_async_noti_worker = y_cable_helper.YCableAsyncNotificationTask()
393+
y_cable_async_noti_worker.start()
394+
self.threads.append(y_cable_async_noti_worker)
392395

393396
# Start main loop
394397
self.log_info("Start daemon main loop")

sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py

+117
Original file line numberDiff line numberDiff line change
@@ -4013,3 +4013,120 @@ def join(self):
40134013

40144014
raise self.exc
40154015

4016+
class GracefulRestartClient:
4017+
def __init__(self, port, channel: grpc.aio.secure_channel, read_side):
4018+
self.port = port
4019+
self.stub = linkmgr_grpc_driver_pb2_grpc.GracefulRestartStub(channel)
4020+
self.request_queue = asyncio.Queue()
4021+
self.response_queue = asyncio.Queue()
4022+
self.read_side = read_side
4023+
4024+
async def send_request_and_get_response(self):
4025+
while True:
4026+
tor = await self.request_queue.get()
4027+
request = linkmgr_grpc_driver_pb2.GracefulAdminRequest(tor=tor)
4028+
response = None
4029+
try:
4030+
response_stream = self.stub.NotifyGracefulRestartStart(request)
4031+
index = 0
4032+
async for response in response_stream:
4033+
helper_logger.log_notice("Async client received from direct read period port = {}: period = {} index = {} guid = {} notifytype {} msgtype = {}".format(self.port, response.period, index, response.guid, response.notifytype, response.msgtype))
4034+
helper_logger.log_debug("Async Debug only :{} {}".format(dir(response_stream), dir(response)))
4035+
index = index+1
4036+
if response == grpc.aio.EOF:
4037+
break
4038+
helper_logger.log_notice("Async client finished loop from direct read period port:{} ".format(self.port))
4039+
index = index+1
4040+
except grpc.RpcError as e:
4041+
helper_logger.log_notice("Async client port = {} exception occured because of {} ".format(self.port, e.code()))
4042+
4043+
await self.response_queue.put(response)
4044+
4045+
async def process_response(self):
4046+
while True:
4047+
response = await self.response_queue.get()
4048+
helper_logger.log_debug("Async recieved a response from {} {}".format(self.port, response))
4049+
# do something with response
4050+
if response is not None:
4051+
await asyncio.sleep(response.period)
4052+
else:
4053+
await asyncio.sleep(20)
4054+
4055+
if self.read_side == 0:
4056+
tor_side = linkmgr_grpc_driver_pb2.ToRSide.UPPER_TOR
4057+
else:
4058+
tor_side = linkmgr_grpc_driver_pb2.ToRSide.LOWER_TOR
4059+
await self.request_queue.put(tor_side)
4060+
4061+
async def notify_graceful_restart_start(self, tor: linkmgr_grpc_driver_pb2.ToRSide):
4062+
await self.request_queue.put(tor)
4063+
4064+
4065+
class YCableAsyncNotificationTask(threading.Thread):
4066+
def __init__(self):
4067+
threading.Thread.__init__(self)
4068+
4069+
self.exc = None
4070+
self.task_stopping_event = threading.Event()
4071+
self.table_helper = y_cable_table_helper.YcableAsyncNotificationTableHelper()
4072+
self.read_side = process_loopback_interface_and_get_read_side(self.table_helper.loopback_keys)
4073+
4074+
async def task_worker(self):
4075+
4076+
# Create tasks for all ports
4077+
logical_port_list = y_cable_platform_sfputil.logical
4078+
tasks = []
4079+
for logical_port_name in logical_port_list:
4080+
if self.task_stopping_event.is_set():
4081+
break
4082+
4083+
# Get the asic to which this port belongs
4084+
asic_index = y_cable_platform_sfputil.get_asic_id_for_logical_port(logical_port_name)
4085+
(status, fvs) = self.table_helper.get_port_tbl()[asic_index].get(logical_port_name)
4086+
if status is False:
4087+
helper_logger.log_warning(
4088+
"Could not retreive fieldvalue pairs for {}, inside config_db table {}".format(logical_port_name, self.table_helper.get_port_tbl()[asic_index].getTableName()))
4089+
continue
4090+
4091+
else:
4092+
# Convert list of tuples to a dictionary
4093+
mux_table_dict = dict(fvs)
4094+
if "state" in mux_table_dict and "soc_ipv4" in mux_table_dict:
4095+
4096+
soc_ipv4_full = mux_table_dict.get("soc_ipv4", None)
4097+
if soc_ipv4_full is not None:
4098+
soc_ipv4 = soc_ipv4_full.split('/')[0]
4099+
4100+
channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ipv4, asic_index, self.table_helper.get_grpc_config_tbl(), self.table_helper.get_fwd_state_response_tbl(), True)
4101+
4102+
client = GracefulRestartClient(logical_port_name, channel, read_side)
4103+
tasks.append(asyncio.create_task(client.send_request_and_get_response()))
4104+
tasks.append(asyncio.create_task(client.process_response()))
4105+
4106+
if self.read_side == 0:
4107+
tor_side = linkmgr_grpc_driver_pb2.ToRSide.UPPER_TOR
4108+
else:
4109+
tor_side = linkmgr_grpc_driver_pb2.ToRSide.LOWER_TOR
4110+
4111+
tasks.append(asyncio.create_task(client.notify_graceful_restart_start(tor_side)))
4112+
4113+
await asyncio.gather(*tasks)
4114+
4115+
def run(self):
4116+
if self.task_stopping_event.is_set():
4117+
return
4118+
4119+
try:
4120+
asyncio.run(self.task_worker())
4121+
except Exception as e:
4122+
helper_logger.log_error("Exception occured at child thread YcableCliUpdateTask due to {} {}".format(repr(e), traceback.format_exc()))
4123+
self.exc = e
4124+
4125+
def join(self):
4126+
4127+
threading.Thread.join(self)
4128+
4129+
helper_logger.log_info("stopped all thread")
4130+
if self.exc is not None:
4131+
4132+
raise self.exc

sonic-ycabled/ycable/ycable_utilities/y_cable_table_helper.py

+58
Original file line numberDiff line numberDiff line change
@@ -471,3 +471,61 @@ def get_appl_db(self):
471471
return self.appl_db
472472

473473

474+
class YcableAsyncNotificationTableHelper(object):
475+
def __init__(self):
476+
477+
self.state_db = {}
478+
self.config_db = {}
479+
self.appl_db = {}
480+
self.port_tbl = {}
481+
self.status_tbl = {}
482+
self.y_cable_tbl = {}
483+
self.mux_tbl = {}
484+
self.grpc_config_tbl = {}
485+
self.fwd_state_response_tbl = {}
486+
self.loopback_tbl= {}
487+
self.loopback_keys = {}
488+
489+
# Get the namespaces in the platform
490+
namespaces = multi_asic.get_front_end_namespaces()
491+
for namespace in namespaces:
492+
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
493+
self.state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace)
494+
self.appl_db[asic_id] = daemon_base.db_connect("APPL_DB", namespace)
495+
self.config_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace)
496+
self.port_tbl[asic_id] = swsscommon.Table(self.config_db[asic_id], "MUX_CABLE")
497+
self.status_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_STATUS_TABLE)
498+
self.y_cable_tbl[asic_id] = swsscommon.Table(
499+
self.state_db[asic_id], swsscommon.STATE_HW_MUX_CABLE_TABLE_NAME)
500+
self.mux_tbl[asic_id] = swsscommon.Table(
501+
self.state_db[asic_id], MUX_CABLE_INFO_TABLE)
502+
self.grpc_config_tbl[asic_id] = swsscommon.Table(self.config_db[asic_id], "GRPCCLIENT")
503+
self.fwd_state_response_tbl[asic_id] = swsscommon.Table(
504+
self.appl_db[asic_id], "FORWARDING_STATE_RESPONSE")
505+
self.loopback_tbl[asic_id] = swsscommon.Table(
506+
self.config_db[asic_id], "LOOPBACK_INTERFACE")
507+
self.loopback_keys[asic_id] = self.loopback_tbl[asic_id].getKeys()
508+
509+
def get_state_db(self):
510+
return self.state_db
511+
512+
def get_config_db(self):
513+
return self.config_db
514+
515+
def get_port_tbl(self):
516+
return self.port_tbl
517+
518+
def get_status_tbl(self):
519+
return self.status_tbl
520+
521+
def get_y_cable_tbl(self):
522+
return self.y_cable_tbl
523+
524+
def get_mux_tbl(self):
525+
return self.mux_tbl
526+
527+
def get_grpc_config_tbl(self):
528+
return self.grpc_config_tbl
529+
530+
def get_fwd_state_response_tbl(self):
531+
return self.fwd_state_response_tbl

0 commit comments

Comments
 (0)