Skip to content

Commit 5c73885

Browse files
SNMP agent enhacements and optimizations
1 parent 381ae47 commit 5c73885

30 files changed

+2098
-66
lines changed

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
'mockredispy>=2.9.3',
1111
'pytest',
1212
'pytest-cov',
13+
'bitstring>=3.1.6',
1314
]
1415

1516
high_performance_deps = [

src/ax_interface/agent.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22

33
from .mib import MIBTable, MIBMeta
44
from .socket_io import SocketManager
5+
from .trap import TrapInfra
56

67
# how long to wait before forcibly killing background task(s) during the shutdown procedure.
78
BACKGROUND_WAIT_TIMEOUT = 10 # seconds
89

910

1011
class Agent:
11-
def __init__(self, mib_cls, update_frequency, loop):
12+
def __init__(self, mib_cls, update_frequency, loop, trap_handlers):
1213
if not type(mib_cls) is MIBMeta:
1314
raise ValueError("Expected a class with type: {}".format(MIBMeta))
1415

@@ -19,8 +20,11 @@ def __init__(self, mib_cls, update_frequency, loop):
1920
self.oid_updaters_enabled = asyncio.Event(loop=loop)
2021
self.stopped = asyncio.Event(loop=loop)
2122

23+
# Initialize our Trap infra
24+
self.trap_infra = TrapInfra(loop,trap_handlers)
25+
2226
# Initialize our MIB
23-
self.mib_table = MIBTable(mib_cls, update_frequency)
27+
self.mib_table = MIBTable(mib_cls, update_frequency, self.trap_infra)
2428

2529
# containers
2630
self.socket_mgr = SocketManager(self.mib_table, self.run_enabled, self.loop)
@@ -54,6 +58,8 @@ async def run_in_event_loop(self):
5458
async def shutdown(self):
5559
# allow the agent to quit
5660
self.run_enabled.clear()
61+
# shutdown trap infra
62+
await self.trap_infra.shutdown()
5763
# close the socket
5864
self.socket_mgr.close()
5965
# wait for the agent to signal back

src/ax_interface/mib.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,13 +257,14 @@ class MIBTable(dict):
257257
Simplistic LUT for Get/GetNext OID. Interprets iterables as keys and implements the same interfaces as dict's.
258258
"""
259259

260-
def __init__(self, mib_cls, update_frequency=DEFAULT_UPDATE_FREQUENCY):
260+
def __init__(self, mib_cls, update_frequency=DEFAULT_UPDATE_FREQUENCY, trap_infra_obj=None):
261261
if type(mib_cls) is not MIBMeta:
262262
raise ValueError("Supplied object is not a MIB class instance.")
263263
super().__init__(getattr(mib_cls, MIBMeta.KEYSTORE))
264264
self.update_frequency = update_frequency
265265
self.updater_instances = getattr(mib_cls, MIBMeta.UPDATERS)
266266
self.prefixes = getattr(mib_cls, MIBMeta.PREFIXES)
267+
self.trap_infra_obj = trap_infra_obj
267268

268269
@staticmethod
269270
def _done_background_task_callback(fut):
@@ -282,6 +283,10 @@ def start_background_tasks(self, event):
282283
fut = asyncio.ensure_future(updater.start())
283284
fut.add_done_callback(MIBTable._done_background_task_callback)
284285
tasks.append(fut)
286+
287+
if self.trap_infra_obj is not None:
288+
fut = asyncio.ensure_future(self.trap_infra_obj.db_listener())
289+
tasks.append(fut)
285290
return asyncio.gather(*tasks, loop=event._loop)
286291

287292
def _find_parent_prefix(self, item):

src/ax_interface/pdu_implementations.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -362,15 +362,26 @@ def __init__(self, *args, **kwargs):
362362
# header only.
363363

364364

365-
# class NotifyPDU(ContextOptionalPDU):
366-
# """
367-
# https://tools.ietf.org/html/rfc2741#section-6.2.10
368-
# """
369-
# header_type_ = PduTypes.NOTIFY
370-
# TODO: Impl
365+
class NotifyPDU(ContextOptionalPDU):
366+
"""
367+
https://tools.ietf.org/html/rfc2741#section-6.2.10
368+
"""
369+
header_type_ = PduTypes.NOTIFY
371370

371+
def __init__(self, header=None, context=None, payload=None, varBinds=[]):
372+
super().__init__(header=header, context=context, payload=payload)
372373

374+
self.varBinds = varBinds
375+
# Reducing the header length as per RFC 2741
376+
# https://tools.ietf.org/html/rfc2741#section-6.1
377+
payload_len = len(self.encode()) - constants.AGENTX_HEADER_LENGTH
378+
self.header = self.header._replace(payload_length=payload_len)
373379

380+
def encode(self):
381+
ret = super().encode()
382+
for value in self.varBinds:
383+
ret += value.to_bytes(self.header.endianness)
384+
return ret
374385

375386
class PingPDU(ContextOptionalPDU):
376387
"""

src/ax_interface/protocol.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from .encodings import ObjectIdentifier
55
from .pdu import PDUHeader, PDUStream
66
from .pdu_implementations import RegisterPDU, ResponsePDU, OpenPDU
7-
7+
from .trap import TrapInfra
88

99
class AgentX(asyncio.Protocol):
1010
"""
@@ -49,6 +49,7 @@ def opening_handshake(self):
4949

5050
def register_subtrees(self, pdu):
5151
self.session_id = pdu.header.session_id
52+
TrapInfra.protocol_obj = self
5253
logger.info("AgentX session starting with ID: {}".format(self.session_id))
5354

5455
for idx, subtree in enumerate(self.mib_table.prefixes):
@@ -138,7 +139,7 @@ def data_received(self, data):
138139
response_pdu = pdu.make_response(self.mib_table)
139140
self.transport.write(response_pdu.encode())
140141
except exceptions.PDUUnpackError:
141-
logger.exception('decode_error[{}]'.format(data))
142+
logger.warning('decode_error[{}]'.format(data[:1024]))
142143
except exceptions.PDUPackError:
143144
logger.exception('encode_error[{}]'.format(data))
144145
except Exception:

src/ax_interface/trap.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import asyncio
2+
from . import logger, constants
3+
from .mib import ValueType
4+
from .encodings import ObjectIdentifier, ValueRepresentation
5+
from .pdu import PDUHeader
6+
from .pdu_implementations import NotifyPDU
7+
import re
8+
import json
9+
import os
10+
11+
class TrapInfra:
12+
"""
13+
Trap infrastructure's core services are define here.
14+
"""
15+
protocol_obj = None # this will be set in the AgentX protocol class
16+
def __init__(self, loop, trap_handlers):
17+
logger.debug("Init begins for Trap infra")
18+
self.loop = loop
19+
self.redis_instances = dict()
20+
self.db_to_redis_dict = dict()
21+
if trap_handlers is None:
22+
return
23+
self.dbKeyToHandler = dict()
24+
trap_handlers_set = set(trap_handlers)
25+
for t_handler in trap_handlers_set:
26+
t_instance = t_handler()
27+
dbKeys = t_instance.dbKeys
28+
for dbkey in dbKeys:
29+
if dbkey not in self.dbKeyToHandler:
30+
self.dbKeyToHandler[dbkey] = list()
31+
self.dbKeyToHandler[dbkey].append(t_instance)
32+
else:
33+
self.dbKeyToHandler[dbkey].append(t_instance)
34+
t_instance.trap_init()
35+
logger.debug("Init successful for Trap infra")
36+
37+
async def db_listener(self):
38+
"""
39+
Co routine which listens for DB notification events
40+
"""
41+
import aioredis
42+
from aioredis.pubsub import Receiver
43+
44+
logger.debug("starting redis co routine")
45+
logger.info("starting redis DB listener routine")
46+
# Read Config File and setup DB instances
47+
CONFIG_FILE = os.getenv('DB_CONFIG_FILE', "/var/run/redis/sonic-db/database_config.json")
48+
if not os.path.exists(CONFIG_FILE):
49+
raise RuntimeError("[Trap:db_listener - DB config file not found " + str(CONFIG_FILE))
50+
else:
51+
with open(CONFIG_FILE, "r") as config_file:
52+
db_config_data = json.load(config_file)
53+
if not 'INSTANCES' in db_config_data:
54+
raise RuntimeError("[Trap:db_listener - No DB instances found in DB config file")
55+
for instance in db_config_data['INSTANCES']:
56+
entry = db_config_data['INSTANCES'][instance]
57+
if instance not in self.redis_instances:
58+
self.redis_instances[instance] = {"host": entry["hostname"], \
59+
"port": entry["port"], "keyPatterns": [], \
60+
"patternObjs": [], "receiver_handle": None, \
61+
"connection_obj": None \
62+
}
63+
for db in db_config_data['DATABASES']:
64+
entry = db_config_data['DATABASES'][db]
65+
db_id = int(entry["id"])
66+
if db_id not in self.db_to_redis_dict:
67+
if entry["instance"] not in self.redis_instances:
68+
raise RuntimeError("[Trap:db_listener - No DB instance found for " + str(entry["instance"]))
69+
self.db_to_redis_dict[db_id] = self.redis_instances[entry["instance"]]
70+
71+
async def reader(receiver_handle):
72+
logger.debug("Listening for notifications")
73+
async for channel, msg in receiver_handle.iter():
74+
logger.debug("Got {!r} in channel {!r}".format(msg, channel))
75+
self.process_trap(channel,msg)
76+
77+
for instance in self.redis_instances:
78+
address_tuple = (self.redis_instances[instance]['host'], self.redis_instances[instance]['port'])
79+
self.redis_instances[instance]["connection_obj"] = await aioredis.create_redis_pool(address_tuple)
80+
receiver_handle = Receiver(loop=self.loop)
81+
self.redis_instances[instance]["receiver_handle"] = receiver_handle
82+
asyncio.ensure_future(reader(receiver_handle))
83+
84+
for pat in self.dbKeyToHandler.keys():
85+
#Get DB number
86+
db_num = re.match(r'__keyspace@(\d+)__:',pat).group(1)
87+
if db_num is None or db_num == "":
88+
raise RuntimeError("[Trap:db_listener - DB number cannot be determined for key " + str(pat))
89+
90+
db_num = int(db_num)
91+
db_instance = self.db_to_redis_dict[db_num]
92+
db_instance["patternObjs"].append(db_instance["receiver_handle"].pattern(pat))
93+
db_instance["keyPatterns"].append(pat)
94+
95+
for instance in self.redis_instances:
96+
if len(self.redis_instances[instance]["patternObjs"]) == 0:
97+
continue
98+
await self.redis_instances[instance]["connection_obj"].psubscribe(*self.redis_instances[instance]["patternObjs"])
99+
100+
def dispatch_trap(self, varBinds):
101+
"""
102+
Prepare Notify PDU and sends to Master using AgentX protocol
103+
"""
104+
logger.debug("dispatch_trap invoked")
105+
if TrapInfra.protocol_obj is not None:
106+
notifyPDU = NotifyPDU(header=PDUHeader(1, \
107+
constants.PduTypes.NOTIFY, \
108+
PDUHeader.MASK_NEWORK_BYTE_ORDER, 0, \
109+
TrapInfra.protocol_obj.session_id, \
110+
0, 0, 0), varBinds=varBinds)
111+
TrapInfra.protocol_obj.send_pdu(notifyPDU)
112+
logger.debug("processed trap successfully")
113+
else:
114+
logger.warning("Protocol Object is None, cannot process traps")
115+
116+
def process_trap(self, channel, msg):
117+
"""
118+
Invokes trap handlers
119+
"""
120+
db_pattern = channel.name.decode('utf-8')
121+
changed_key = msg[0].decode('utf-8')
122+
123+
for t_instance in self.dbKeyToHandler[db_pattern]:
124+
varbindsDict = t_instance.trap_process(msg, changed_key)
125+
if varbindsDict is None:
126+
continue # no process
127+
assert isinstance(varbindsDict, dict)
128+
assert 'TrapOid' in varbindsDict
129+
assert 'varBinds' in varbindsDict
130+
varbinds = varbindsDict['varBinds']
131+
TrapOid = varbindsDict['TrapOid']
132+
assert isinstance(TrapOid, ObjectIdentifier)
133+
varbindsList = []
134+
# Insert standard SNMP trap
135+
snmpTrapOid = ObjectIdentifier(11, 0, 0, 0, (1, 3, 6, 1, 6, 3, 1, 1, 4, 1, 0))
136+
snmpTrapVarBind = ValueRepresentation(ValueType.OBJECT_IDENTIFIER, 0, snmpTrapOid, TrapOid)
137+
varbindsList.append(snmpTrapVarBind)
138+
if len(varbinds) > 0:
139+
for vb in varbinds:
140+
if not isinstance(vb, ValueRepresentation):
141+
raise RuntimeError("list entry is not of type ValueRepresentation")
142+
else:
143+
varbindsList.append(vb)
144+
else:
145+
raise RuntimeError("Return value must contain atleast one VarBind")
146+
self.dispatch_trap(varbindsList)
147+
148+
async def shutdown(self):
149+
for instance in self.redis_instances:
150+
if len(self.redis_instances[instance]["keyPatterns"]) > 0:
151+
await self.redis_instances[instance]["connection_obj"].punsubscribe(*self.redis_instances[instance]["keyPatterns"])
152+
self.redis_instances[instance]["receiver_handle"].stop()
153+
self.redis_instances[instance]["connection_obj"].close()
154+
await self.redis_instances[instance]["connection_obj"].wait_closed()
155+
156+
class Trap:
157+
"""
158+
Interface for developing Trap handlers
159+
"""
160+
def __init__(self, **kwargs):
161+
self.run_event = asyncio.Event()
162+
assert isinstance(kwargs["dbKeys"], list)
163+
self.dbKeys = kwargs["dbKeys"]
164+
165+
def trap_init(self):
166+
"""
167+
Children may override this method.
168+
"""
169+
logger.info("I am trap_init from infra")
170+
171+
def trap_process(self, dbMessage, changedKey):
172+
"""
173+
Children may override this method.
174+
"""
175+
logger.info("I am trap_process from infra")
176+
177+
178+
179+

src/sonic_ax_impl/__main__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ def install_fragments():
4343
local_filepath = os.path.dirname(os.path.abspath(__file__))
4444
pass_script = os.path.join(local_filepath, 'bin/sysDescr_pass.py')
4545
install_file(pass_script, '/usr/share/snmp', executable=True)
46-
46+
pass_script = os.path.join(local_filepath, 'bin/sysObjectId_pass.py')
47+
install_file(pass_script, '/usr/share/snmp', executable=True)
4748

4849
# Mapping logging log level to SWSS log level
4950
# ref: https://docs.python.org/3/library/logging.html#logging-levels

0 commit comments

Comments
 (0)