13
13
from datetime import datetime
14
14
from ipaddress import ip_interface
15
15
from queue import Queue
16
+ from threading import Lock , Event , Thread
16
17
17
18
from swsscommon .swsscommon import ConfigDBConnector , SonicV2Connector , \
18
19
DBConnector , Select , SubscriberStateTable
29
30
30
31
STATE_DB = 'STATE_DB'
31
32
APPL_DB = 'APPL_DB'
33
+ COUNTERS_DB = 'COUNTERS_DB'
34
+ TUNNEL_PKT_COUNTER_TEMPLATE = 'COUNTERS{}IPINIP_TUNNEL_CPU_PKTS'
35
+ COUNTER_KEY = 'RX_COUNT'
32
36
PORTCHANNEL_INTERFACE_TABLE = 'PORTCHANNEL_INTERFACE'
33
37
TUNNEL_TABLE = 'TUNNEL'
34
38
PEER_SWITCH_TABLE = 'PEER_SWITCH'
@@ -69,13 +73,18 @@ def __init__(self):
69
73
self .config_db .connect ()
70
74
self .state_db = SonicV2Connector ()
71
75
self .state_db .connect (STATE_DB )
76
+ self .counters_db = SonicV2Connector ()
77
+ self .counters_db .connect (COUNTERS_DB )
78
+ counters_db_separator = self .counters_db .get_db_separator (COUNTERS_DB )
79
+ self .tunnel_counter_table = TUNNEL_PKT_COUNTER_TEMPLATE .format (counters_db_separator )
72
80
self ._portchannel_intfs = None
73
81
self .up_portchannels = None
74
82
self .netlink_api = IPRoute ()
75
83
self .sniffer = None
76
84
self .self_ip = ''
77
85
self .packet_filter = ''
78
86
self .sniff_intfs = set ()
87
+ self .pending_cmds = Queue ()
79
88
80
89
global portchannel_intfs
81
90
portchannel_intfs = [name for name , _ in self .portchannel_intfs ]
@@ -304,6 +313,27 @@ def start_sniffer(self):
304
313
while not hasattr (self .sniffer , 'stop_cb' ):
305
314
time .sleep (0.1 )
306
315
316
+ def write_count_to_db (self ):
317
+ while True :
318
+ # use a set to automatically deduplicate destination IPs
319
+ to_run = set ()
320
+
321
+ to_run .add (tuple (self .pending_cmds .get ()))
322
+ pkt_count = 1
323
+ while not self .pending_cmds .empty () and len (to_run ) < 100 :
324
+ to_run .add (tuple (self .pending_cmds .get ()))
325
+ # we should always count each packet, but only ping for each unique IP
326
+ pkt_count += 1
327
+
328
+ for cmds in to_run :
329
+ logger .log_info ("Running command '{}'" .format (' ' .join (cmds )))
330
+ subprocess .run (cmds , stdout = subprocess .DEVNULL )
331
+ try :
332
+ curr_count = int (self .counters_db .get (COUNTERS_DB , self .tunnel_counter_table , COUNTER_KEY ))
333
+ except TypeError :
334
+ curr_count = 0
335
+ self .counters_db .set (COUNTERS_DB , self .tunnel_counter_table , COUNTER_KEY , str (curr_count + pkt_count ))
336
+
307
337
def ping_inner_dst (self , packet ):
308
338
"""
309
339
Pings the inner destination IP for an encapsulated packet
@@ -319,8 +349,7 @@ def ping_inner_dst(self, packet):
319
349
cmds .append ('-6' )
320
350
dst_ip = packet [IP ].payload [inner_packet_type ].dst
321
351
cmds .append (dst_ip )
322
- logger .log_info ("Running command '{}'" .format (' ' .join (cmds )))
323
- subprocess .run (cmds , stdout = subprocess .DEVNULL )
352
+ self .pending_cmds .put (cmds )
324
353
325
354
def listen_for_tunnel_pkts (self ):
326
355
"""
@@ -339,7 +368,6 @@ def listen_for_tunnel_pkts(self):
339
368
logger .log_notice ('Starting tunnel packet handler for {}'
340
369
.format (self .packet_filter ))
341
370
342
-
343
371
app_db = DBConnector (APPL_DB , 0 )
344
372
lag_table = SubscriberStateTable (app_db , LAG_TABLE )
345
373
sel = Select ()
@@ -355,7 +383,7 @@ def listen_for_tunnel_pkts(self):
355
383
elif rc == Select .ERROR :
356
384
raise Exception ("Select() error" )
357
385
else :
358
- lag , op , fvs = lag_table .pop ()
386
+ lag , _ , fvs = lag_table .pop ()
359
387
if self .sniffer_restart_required (lag , fvs ):
360
388
self .sniffer .stop ()
361
389
start = datetime .now ()
@@ -374,6 +402,8 @@ def run(self):
374
402
Entry point for the TunnelPacketHandler class
375
403
"""
376
404
self .wait_for_portchannels ()
405
+ db_thread = Thread (target = self .write_count_to_db , daemon = True )
406
+ db_thread .start ()
377
407
self .listen_for_tunnel_pkts ()
378
408
379
409
0 commit comments