Skip to content

Commit 7195611

Browse files
infra for snmp support and linkUp/Down and Config change traps
1 parent e0f36a5 commit 7195611

File tree

11 files changed

+690
-15
lines changed

11 files changed

+690
-15
lines changed

src/ax_interface/agent.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22

33
from .mib import MIBTable, MIBMeta
44
from .socket_io import SocketManager
5+
from .trap import TrapInfra
56

67
# how long to wait before forcibly killing background task(s) during the shutdown procedure.
78
BACKGROUND_WAIT_TIMEOUT = 10 # seconds
89

910

1011
class Agent:
11-
def __init__(self, mib_cls, update_frequency, loop):
12+
def __init__(self, mib_cls, update_frequency, loop, trap_handlers):
1213
if not type(mib_cls) is MIBMeta:
1314
raise ValueError("Expected a class with type: {}".format(MIBMeta))
1415

@@ -19,8 +20,11 @@ def __init__(self, mib_cls, update_frequency, loop):
1920
self.oid_updaters_enabled = asyncio.Event(loop=loop)
2021
self.stopped = asyncio.Event(loop=loop)
2122

23+
# Initialize our Trap infra
24+
self.trap_infra = TrapInfra(loop,trap_handlers)
25+
2226
# Initialize our MIB
23-
self.mib_table = MIBTable(mib_cls, update_frequency)
27+
self.mib_table = MIBTable(mib_cls, update_frequency, self.trap_infra)
2428

2529
# containers
2630
self.socket_mgr = SocketManager(self.mib_table, self.run_enabled, self.loop)
@@ -54,6 +58,8 @@ async def run_in_event_loop(self):
5458
async def shutdown(self):
5559
# allow the agent to quit
5660
self.run_enabled.clear()
61+
# shutdown trap infra
62+
await self.trap_infra.shutdown()
5763
# close the socket
5864
self.socket_mgr.close()
5965
# wait for the agent to signal back

src/ax_interface/mib.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,13 +257,14 @@ class MIBTable(dict):
257257
Simplistic LUT for Get/GetNext OID. Interprets iterables as keys and implements the same interfaces as dict's.
258258
"""
259259

260-
def __init__(self, mib_cls, update_frequency=DEFAULT_UPDATE_FREQUENCY):
260+
def __init__(self, mib_cls, update_frequency=DEFAULT_UPDATE_FREQUENCY, trap_infra_obj=None):
261261
if type(mib_cls) is not MIBMeta:
262262
raise ValueError("Supplied object is not a MIB class instance.")
263263
super().__init__(getattr(mib_cls, MIBMeta.KEYSTORE))
264264
self.update_frequency = update_frequency
265265
self.updater_instances = getattr(mib_cls, MIBMeta.UPDATERS)
266266
self.prefixes = getattr(mib_cls, MIBMeta.PREFIXES)
267+
self.trap_infra_obj = trap_infra_obj
267268

268269
@staticmethod
269270
def _done_background_task_callback(fut):
@@ -282,6 +283,10 @@ def start_background_tasks(self, event):
282283
fut = asyncio.ensure_future(updater.start())
283284
fut.add_done_callback(MIBTable._done_background_task_callback)
284285
tasks.append(fut)
286+
287+
if self.trap_infra_obj is not None:
288+
fut = asyncio.ensure_future(self.trap_infra_obj.db_listener())
289+
tasks.append(fut)
285290
return asyncio.gather(*tasks, loop=event._loop)
286291

287292
def _find_parent_prefix(self, item):

src/ax_interface/pdu_implementations.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -362,15 +362,26 @@ def __init__(self, *args, **kwargs):
362362
# header only.
363363

364364

365-
# class NotifyPDU(ContextOptionalPDU):
366-
# """
367-
# https://tools.ietf.org/html/rfc2741#section-6.2.10
368-
# """
369-
# header_type_ = PduTypes.NOTIFY
370-
# TODO: Impl
365+
class NotifyPDU(ContextOptionalPDU):
366+
"""
367+
https://tools.ietf.org/html/rfc2741#section-6.2.10
368+
"""
369+
header_type_ = PduTypes.NOTIFY
371370

371+
def __init__(self, header=None, context=None, payload=None, varBinds=[]):
372+
super().__init__(header=header, context=context, payload=payload)
372373

374+
self.varBinds = varBinds
375+
# Reducing the header length as per RFC 2741
376+
# https://tools.ietf.org/html/rfc2741#section-6.1
377+
payload_len = len(self.encode()) - constants.AGENTX_HEADER_LENGTH
378+
self.header = self.header._replace(payload_length=payload_len)
373379

380+
def encode(self):
381+
ret = super().encode()
382+
for value in self.varBinds:
383+
ret += value.to_bytes(self.header.endianness)
384+
return ret
374385

375386
class PingPDU(ContextOptionalPDU):
376387
"""

src/ax_interface/protocol.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import asyncio
22

33
from . import logger, constants, exceptions
4-
from .encodings import ObjectIdentifier
4+
from .encodings import ObjectIdentifier, ValueRepresentation
55
from .pdu import PDUHeader, PDUStream
66
from .pdu_implementations import RegisterPDU, ResponsePDU, OpenPDU
7-
7+
from .trap import TrapInfra
88

99
class AgentX(asyncio.Protocol):
1010
"""
@@ -49,6 +49,7 @@ def opening_handshake(self):
4949

5050
def register_subtrees(self, pdu):
5151
self.session_id = pdu.header.session_id
52+
TrapInfra.protocol_obj = self
5253
logger.info("AgentX session starting with ID: {}".format(self.session_id))
5354

5455
for idx, subtree in enumerate(self.mib_table.prefixes):

src/ax_interface/trap.py

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
import pdb
2+
import asyncio
3+
from . import logger, constants, exceptions
4+
from .mib import ValueType
5+
from .encodings import ObjectIdentifier, ValueRepresentation
6+
from .pdu import PDUHeader
7+
from .pdu_implementations import NotifyPDU
8+
import re
9+
import json
10+
import os
11+
import swsssdk
12+
13+
class TrapInfra:
14+
"""
15+
Trap infrastructure's core services are define here.
16+
"""
17+
protocol_obj = None # this will be set in the AgentX protocol class
18+
def __init__(self, loop, trap_handlers):
19+
logger.debug("Init begins for Trap infra")
20+
self.loop = loop
21+
self.redis_instances = dict()
22+
self.db_to_redis_dict = dict()
23+
if trap_handlers is None:
24+
return
25+
self.dbKeyToHandler = dict()
26+
trap_handlers_set = set(trap_handlers)
27+
for t_handler in trap_handlers_set:
28+
t_instance = t_handler()
29+
dbKeys = t_instance.dbKeys
30+
for dbkey in dbKeys:
31+
if dbkey not in self.dbKeyToHandler:
32+
self.dbKeyToHandler[dbkey] = list()
33+
self.dbKeyToHandler[dbkey].append(t_instance)
34+
else:
35+
self.dbKeyToHandler[dbkey].append(t_instance)
36+
t_instance.trap_init()
37+
logger.debug("Init successful for Trap infra")
38+
39+
async def db_listener(self):
40+
"""
41+
Co routine which listens for DB notification events
42+
"""
43+
import aioredis
44+
from aioredis.pubsub import Receiver
45+
46+
logger.debug("starting redis co routine")
47+
logger.info("starting redis DB listener routine")
48+
# Read Config File and setup DB instances
49+
CONFIG_FILE = os.getenv('DB_CONFIG_FILE', "/var/run/redis/sonic-db/database_config.json")
50+
if not os.path.exists(CONFIG_FILE):
51+
raise RuntimeError("[Trap:db_listener - DB config file not found " + str(CONFIG_FILE))
52+
else:
53+
with open(CONFIG_FILE, "r") as config_file:
54+
db_config_data = json.load(config_file)
55+
if not 'INSTANCES' in db_config_data:
56+
raise RuntimeError("[Trap:db_listener - No DB instances found in DB config file")
57+
for instance in db_config_data['INSTANCES']:
58+
entry = db_config_data['INSTANCES'][instance]
59+
if instance not in self.redis_instances:
60+
self.redis_instances[instance] = {"host": entry["hostname"], \
61+
"port": entry["port"], "keyPatterns": [], \
62+
"patternObjs": [], "receiver_handle": None, \
63+
"connection_obj": None \
64+
}
65+
for db in db_config_data['DATABASES']:
66+
entry = db_config_data['DATABASES'][db]
67+
db_id = int(entry["id"])
68+
if db_id not in self.db_to_redis_dict:
69+
if entry["instance"] not in self.redis_instances:
70+
raise RuntimeError("[Trap:db_listener - No DB instance found for " + str(entry["instance"]))
71+
self.db_to_redis_dict[db_id] = self.redis_instances[entry["instance"]]
72+
73+
async def reader(receiver_handle):
74+
logger.debug("Listening for notifications")
75+
async for channel, msg in receiver_handle.iter():
76+
logger.debug("Got {!r} in channel {!r}".format(msg, channel))
77+
self.process_trap(channel,msg)
78+
79+
for instance in self.redis_instances:
80+
address_tuple = (self.redis_instances[instance]['host'], self.redis_instances[instance]['port'])
81+
self.redis_instances[instance]["connection_obj"] = await aioredis.create_redis_pool(address_tuple)
82+
receiver_handle = Receiver(loop=self.loop)
83+
self.redis_instances[instance]["receiver_handle"] = receiver_handle
84+
asyncio.ensure_future(reader(receiver_handle))
85+
86+
for pat in self.dbKeyToHandler.keys():
87+
#Get DB number
88+
db_num = re.match(r'__keyspace@(\d+)__:',pat).group(1)
89+
if db_num is None or db_num == "":
90+
raise RuntimeError("[Trap:db_listener - DB number cannot be determined for key " + str(pat))
91+
92+
db_num = int(db_num)
93+
db_instance = self.db_to_redis_dict[db_num]
94+
db_instance["patternObjs"].append(db_instance["receiver_handle"].pattern(pat))
95+
db_instance["keyPatterns"].append(pat)
96+
97+
for instance in self.redis_instances:
98+
if len(self.redis_instances[instance]["patternObjs"]) == 0:
99+
continue
100+
await self.redis_instances[instance]["connection_obj"].psubscribe(*self.redis_instances[instance]["patternObjs"])
101+
102+
def dispatch_trap(self, varBinds):
103+
"""
104+
Prepare Notify PDU and sends to Master using AgentX protocol
105+
"""
106+
logger.debug("dispatch_trap invoked")
107+
if TrapInfra.protocol_obj is not None:
108+
notifyPDU = NotifyPDU(header=PDUHeader(1, \
109+
constants.PduTypes.NOTIFY, \
110+
PDUHeader.MASK_NEWORK_BYTE_ORDER, 0, \
111+
TrapInfra.protocol_obj.session_id, \
112+
0, 0, 0), varBinds=varBinds)
113+
TrapInfra.protocol_obj.send_pdu(notifyPDU)
114+
logger.debug("processed trap successfully")
115+
else:
116+
logger.warning("Protocol Object is None, cannot process traps")
117+
118+
def process_trap(self, channel, msg):
119+
"""
120+
Invokes trap handlers
121+
"""
122+
db_pattern = channel.name.decode('utf-8')
123+
changed_key = msg[0].decode('utf-8')
124+
125+
for t_instance in self.dbKeyToHandler[db_pattern]:
126+
varbindsDict = t_instance.trap_process(msg, changed_key)
127+
if varbindsDict is None:
128+
continue # no process
129+
assert isinstance(varbindsDict, dict)
130+
assert 'TrapOid' in varbindsDict
131+
assert 'varBinds' in varbindsDict
132+
varbinds = varbindsDict['varBinds']
133+
TrapOid = varbindsDict['TrapOid']
134+
assert isinstance(TrapOid, ObjectIdentifier)
135+
varbindsList = []
136+
# Insert standard SNMP trap
137+
snmpTrapOid = ObjectIdentifier(11, 0, 0, 0, (1, 3, 6, 1, 6, 3, 1, 1, 4, 1, 0))
138+
snmpTrapVarBind = ValueRepresentation(ValueType.OBJECT_IDENTIFIER, 0, snmpTrapOid, TrapOid)
139+
varbindsList.append(snmpTrapVarBind)
140+
if len(varbinds) > 0:
141+
for vb in varbinds:
142+
if not isinstance(vb, ValueRepresentation):
143+
raise RuntimeError("list entry is not of type ValueRepresentation")
144+
else:
145+
varbindsList.append(vb)
146+
else:
147+
raise RuntimeError("Return value must contain atleast one VarBind")
148+
self.dispatch_trap(varbindsList)
149+
150+
async def shutdown(self):
151+
for instance in self.redis_instances:
152+
if len(self.redis_instances[instance]["keyPatterns"]) > 0:
153+
await self.redis_instances[instance]["connection_obj"].punsubscribe(*self.redis_instances[instance]["keyPatterns"])
154+
self.redis_instances[instance]["receiver_handle"].stop()
155+
self.redis_instances[instance]["connection_obj"].close()
156+
await self.redis_instances[instance]["connection_obj"].wait_closed()
157+
158+
class Trap:
159+
"""
160+
Interface for developing Trap handlers
161+
"""
162+
def __init__(self, **kwargs):
163+
self.run_event = asyncio.Event()
164+
assert isinstance(kwargs["dbKeys"], list)
165+
self.dbKeys = kwargs["dbKeys"]
166+
167+
def trap_init(self):
168+
"""
169+
Children may override this method.
170+
"""
171+
logger.info("I am trap_init from infra")
172+
173+
def trap_process(self, dbMessage, changedKey):
174+
"""
175+
Children may override this method.
176+
"""
177+
logger.info("I am trap_process from infra")
178+
179+
180+
181+

src/sonic_ax_impl/main.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import ax_interface
1212
from sonic_ax_impl.mibs import ieee802_1ab
1313
from . import logger
14-
from .mibs.ietf import rfc1213, rfc2737, rfc2863, rfc3433, rfc4292, rfc4363
15-
from .mibs.vendor import dell, cisco
14+
from .mibs.ietf import rfc1213, rfc2737, rfc2863, rfc3433, rfc4292, rfc4363, link_up_down_trap
15+
from .mibs.vendor import dell, cisco, broadcom
1616

1717
# Background task update frequency ( in seconds )
1818
DEFAULT_UPDATE_FREQUENCY = 5
@@ -46,6 +46,8 @@ class SonicMIB(
4646
If SONiC was to create custom MIBEntries, they may be specified here.
4747
"""
4848

49+
# Register Trap handlers
50+
trap_handlers = [link_up_down_trap.linkUpDownTrap, broadcom.enterprise_trap.configChangeTrap]
4951

5052
def shutdown(signame, agent):
5153
# FIXME: If the Agent dies, the background tasks will zombie.
@@ -59,7 +61,7 @@ def main(update_frequency=None):
5961

6062
try:
6163
# initialize handler and set update frequency (or use the default)
62-
agent = ax_interface.Agent(SonicMIB, update_frequency or DEFAULT_UPDATE_FREQUENCY, event_loop)
64+
agent = ax_interface.Agent(SonicMIB, update_frequency or DEFAULT_UPDATE_FREQUENCY, event_loop, trap_handlers)
6365

6466
# add "shutdown" signal handlers
6567
# https://docs.python.org/3.5/library/asyncio-eventloop.html#set-signal-handlers-for-sigint-and-sigterm
@@ -72,7 +74,8 @@ def main(update_frequency=None):
7274
event_loop.run_until_complete(agent.run_in_event_loop())
7375

7476
except Exception:
75-
logger.exception("Uncaught exception in {}".format(__name__))
77+
if shutdown_task is None:
78+
logger.exception("Uncaught exception in {}".format(__name__))
7679
sys.exit(1)
7780
finally:
7881
if shutdown_task is not None:

0 commit comments

Comments
 (0)