From 6ea242892c2046725beaedb24d9e8d7937737f03 Mon Sep 17 00:00:00 2001 From: Asad Raza Date: Mon, 13 May 2024 18:42:59 +0500 Subject: [PATCH 01/12] Add service and handler files for DHCP DoS logger process --- .../dhcp_dos_logger/dhcp_dos_logger.py | 35 +++++++++++++++++++ .../dhcp_dos_logger/dhcp_dos_logger.service | 14 ++++++++ 2 files changed, 49 insertions(+) create mode 100644 files/image_config/dhcp_dos_logger/dhcp_dos_logger.py create mode 100644 files/image_config/dhcp_dos_logger/dhcp_dos_logger.service diff --git a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py new file mode 100644 index 000000000000..a5eb9f94a4cc --- /dev/null +++ b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 + +import re +import os +import subprocess +import time +from sonic_py_common.logger import Logger +from swsscommon.swsscommon import ConfigDBConnector + +SYSLOG_IDENTIFIER = os.path.basename(__file__) + +# Global logger instance +logger = Logger(SYSLOG_IDENTIFIER) +logger.log_info("Starting DHCP DoS logger...") + +# Connect to config db +config_db = ConfigDBConnector() +config_db.connect() + +# Initialize +drop_pkts = {} + +# Get list of ports +ports = config_db.get_table('PORT').keys() + +for port in ports: + drop_pkts[port] = 0 + +# Main handler function +def handler(): + while True: + for port in drop_pkts.keys(): + output = subprocess.run(["tc -s qdisc show dev {} handle ffff:".format(str(port))], shell=True, capture_output=True) + if output is not None: + match = re.search(r'dropped (\d+)', output.stdout) \ No newline at end of file diff --git a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.service b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.service new file mode 100644 index 000000000000..6bf15b6fc158 --- /dev/null +++ b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.service @@ -0,0 +1,14 @@ +[Unit] +Description=Log DHCP rate limit violations +Requires=config-setup.service +After=config-setup.service +BindsTo=sonic.target +After=sonic.target +After=network.target + +[Service] +Type=simple +ExecStart=/usr/bin/dhcp_dos_logger.py + +[Install] +WantedBy=sonic.target \ No newline at end of file From efe9e834561dce3b31f494753c68718da198ab4c Mon Sep 17 00:00:00 2001 From: Muhammad Ali Hussnain <91005947+muhammadalihussnain@users.noreply.github.com> Date: Tue, 14 May 2024 11:50:59 +0000 Subject: [PATCH 02/12] commited obtaining and matching than initail counter --- .../dhcp_dos_logger/dhcp_dos_logger.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py index a5eb9f94a4cc..1935cc9c0b7c 100644 --- a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py +++ b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py @@ -23,13 +23,21 @@ # Get list of ports ports = config_db.get_table('PORT').keys() -for port in ports: - drop_pkts[port] = 0 +drop_pkts = {port: 0 for port in get_ports()} # Main handler function def handler(): while True: for port in drop_pkts.keys(): - output = subprocess.run(["tc -s qdisc show dev {} handle ffff:".format(str(port))], shell=True, capture_output=True) + + output = subprocess.run(["tc", "-s", "qdisc", "show", "dev", str(port), "handle", "ffff:"], shell=True, capture_output=True) + + #output = subprocess.run(["tc -s qdisc show dev {} handle ffff:".format(str(port))], shell=True, capture_output=True) if output is not None: - match = re.search(r'dropped (\d+)', output.stdout) \ No newline at end of file + match = re.search(r'dropped (\d+)', output.stdout) + + if int(match) > drop_pkts[port]: + logger.log_info(f"Port {port}: Current DHCP drop counter is {int(match)}") + drop_pkts[port] = int(match) + else: + logger.log_warning(f"No dropped packets found on port {port}") \ No newline at end of file From 893bd88d1feb3c690afca500dba3af41a08aa2ac Mon Sep 17 00:00:00 2001 From: Muhammad Ali Hussnain <91005947+muhammadalihussnain@users.noreply.github.com> Date: Tue, 14 May 2024 12:02:19 +0000 Subject: [PATCH 03/12] shell= False --- files/image_config/dhcp_dos_logger/dhcp_dos_logger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py index 1935cc9c0b7c..7e3c01a51bac 100644 --- a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py +++ b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py @@ -30,7 +30,7 @@ def handler(): while True: for port in drop_pkts.keys(): - output = subprocess.run(["tc", "-s", "qdisc", "show", "dev", str(port), "handle", "ffff:"], shell=True, capture_output=True) + output = subprocess.run(["tc", "-s", "qdisc", "show", "dev", str(port), "handle", "ffff:"], shell=False, capture_output=True) #output = subprocess.run(["tc -s qdisc show dev {} handle ffff:".format(str(port))], shell=True, capture_output=True) if output is not None: From 0014f64ac1551ebe73603184b50139e9f919e22b Mon Sep 17 00:00:00 2001 From: Muhammad Ali Hussnain <91005947+muhammadalihussnain@users.noreply.github.com> Date: Wed, 15 May 2024 05:15:08 +0000 Subject: [PATCH 04/12] Entry Point Added --- .../dhcp_dos_logger/dhcp_dos_logger.py | 42 ++++++++++++++----- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py index 7e3c01a51bac..826605109fd2 100644 --- a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py +++ b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py @@ -23,21 +23,43 @@ # Get list of ports ports = config_db.get_table('PORT').keys() -drop_pkts = {port: 0 for port in get_ports()} +# Initialize the ports with zero initial packet drops +drop_pkts = {port: 0 for port in ports} +# Main handler function # Main handler function def handler(): + """ + Continuously monitors ports for dropped DHCP packets and logs them. + """ while True: for port in drop_pkts.keys(): + try: + output = subprocess.run(["tc", "-s", "qdisc", "show", "dev", str(port), "handle", "ffff:"], shell=True, capture_output=True) + if output.returncode == 0: # Check for successful execution + match = re.search(r'dropped (\d+)', output.stdout) + if match: + dropped_count = int(match.group(1)) + if dropped_count > drop_pkts[port]: + logger.log_info(f"Port {port}: Current DHCP drop counter is {dropped_count}") + drop_pkts[port] = dropped_count + else: + logger.log_warning(f"No new dropped packets found on port {port}") + else: + logger.log_warning(f"No dropped packet information found for port {port}") + except subprocess.CalledProcessError as e: + logger.log_error(f"Error executing 'tc' command: {e}") + + time.sleep(10) # Adjust sleep time as needed (e.g., check for dropped packets every 10 seconds) + - output = subprocess.run(["tc", "-s", "qdisc", "show", "dev", str(port), "handle", "ffff:"], shell=False, capture_output=True) +# Entry point function +def main(): + """ + Entry point for the daemon. + """ + handler() - #output = subprocess.run(["tc -s qdisc show dev {} handle ffff:".format(str(port))], shell=True, capture_output=True) - if output is not None: - match = re.search(r'dropped (\d+)', output.stdout) - if int(match) > drop_pkts[port]: - logger.log_info(f"Port {port}: Current DHCP drop counter is {int(match)}") - drop_pkts[port] = int(match) - else: - logger.log_warning(f"No dropped packets found on port {port}") \ No newline at end of file +if __name__ == "__main__": + main() \ No newline at end of file From 33fb23a3f55307994a82a64a06dbc9a3aaed7b3e Mon Sep 17 00:00:00 2001 From: Muhammad Ali Hussnain <91005947+muhammadalihussnain@users.noreply.github.com> Date: Wed, 15 May 2024 05:17:01 +0000 Subject: [PATCH 05/12] Entry Point + shell=False --- files/image_config/dhcp_dos_logger/dhcp_dos_logger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py index 826605109fd2..8f62307c6d4a 100644 --- a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py +++ b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py @@ -35,7 +35,7 @@ def handler(): while True: for port in drop_pkts.keys(): try: - output = subprocess.run(["tc", "-s", "qdisc", "show", "dev", str(port), "handle", "ffff:"], shell=True, capture_output=True) + output = subprocess.run(["tc", "-s", "qdisc", "show", "dev", str(port), "handle", "ffff:"], shell=False, capture_output=True) if output.returncode == 0: # Check for successful execution match = re.search(r'dropped (\d+)', output.stdout) if match: From da2151692a44c5e2391a50abd58988805a7b009c Mon Sep 17 00:00:00 2001 From: Muhammad Ali Hussnain <91005947+muhammadalihussnain@users.noreply.github.com> Date: Wed, 15 May 2024 10:18:42 +0000 Subject: [PATCH 06/12] nothing seen --- files/image_config/dhcp_dos_logger/dhcp_dos_logger.service | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.service b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.service index 6bf15b6fc158..7ea8866444fc 100644 --- a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.service +++ b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.service @@ -8,7 +8,6 @@ After=network.target [Service] Type=simple -ExecStart=/usr/bin/dhcp_dos_logger.py - +ExecStart=/usr/bin/dhcp_dos_logger.py [Install] WantedBy=sonic.target \ No newline at end of file From 0219f086f9e557e34acb361d357a8ebdfed38917 Mon Sep 17 00:00:00 2001 From: Muhammad Ali Hussnain <91005947+muhammadalihussnain@users.noreply.github.com> Date: Wed, 15 May 2024 18:26:08 +0000 Subject: [PATCH 07/12] changed path of dhcp_dos_logger.py file in service file --- files/image_config/dhcp_dos_logger/dhcp_dos_logger.service | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.service b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.service index 7ea8866444fc..181a76323e5f 100644 --- a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.service +++ b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.service @@ -8,6 +8,7 @@ After=network.target [Service] Type=simple -ExecStart=/usr/bin/dhcp_dos_logger.py +ExecStart= /workspaces/sonic-buildimage/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py [Install] -WantedBy=sonic.target \ No newline at end of file +WantedBy=sonic.target + From d54ad45c626d51fd461be60971a5f44a9c99d450 Mon Sep 17 00:00:00 2001 From: Asad Raza Date: Wed, 29 May 2024 13:51:36 +0000 Subject: [PATCH 08/12] Fix for log messages --- files/image_config/dhcp_dos_logger/dhcp_dos_logger.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py index 8f62307c6d4a..a25140593763 100644 --- a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py +++ b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py @@ -26,7 +26,6 @@ # Initialize the ports with zero initial packet drops drop_pkts = {port: 0 for port in ports} -# Main handler function # Main handler function def handler(): """ @@ -41,16 +40,16 @@ def handler(): if match: dropped_count = int(match.group(1)) if dropped_count > drop_pkts[port]: - logger.log_info(f"Port {port}: Current DHCP drop counter is {dropped_count}") + logger.log_warning(f"Port {port}: Current DHCP drop counter is {dropped_count}") drop_pkts[port] = dropped_count else: - logger.log_warning(f"No new dropped packets found on port {port}") + pass else: - logger.log_warning(f"No dropped packet information found for port {port}") + logger.log_warning(f"Failed to get dropped packet information for port {port}") except subprocess.CalledProcessError as e: logger.log_error(f"Error executing 'tc' command: {e}") - time.sleep(10) # Adjust sleep time as needed (e.g., check for dropped packets every 10 seconds) + time.sleep(10) # Entry point function From 57c07c58860ac420fdec9e2fb23951311af7f026 Mon Sep 17 00:00:00 2001 From: Asad Raza Date: Fri, 31 May 2024 17:48:13 +0500 Subject: [PATCH 09/12] Add path for new service and handler files in debian template --- files/build_templates/sonic_debian_extension.j2 | 5 +++++ files/image_config/dhcp_dos_logger/dhcp_dos_logger.py | 2 +- files/image_config/dhcp_dos_logger/dhcp_dos_logger.service | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/files/build_templates/sonic_debian_extension.j2 b/files/build_templates/sonic_debian_extension.j2 index 2889b2469960..da2bea82d76f 100644 --- a/files/build_templates/sonic_debian_extension.j2 +++ b/files/build_templates/sonic_debian_extension.j2 @@ -480,6 +480,11 @@ sudo cp $IMAGE_CONFIGS/copp/copp-config.sh $FILESYSTEM_ROOT/usr/bin/ sudo cp $IMAGE_CONFIGS/copp/copp_cfg.j2 $FILESYSTEM_ROOT_USR_SHARE_SONIC_TEMPLATES/ echo "copp-config.service" | sudo tee -a $GENERATED_SERVICE_FILE +# Copy DHCP DoS logger configuration files +sudo cp $IMAGE_CONFIGS/dhcp_dos_logger/dhcp_dos_logger.service $FILESYSTEM_ROOT_USR_LIB_SYSTEMD_SYSTEM +sudo cp $IMAGE_CONFIGS/dhcp_dos_logger/dhcp_dos_logger.py $FILESYSTEM_ROOT/usr/bin/ +echo "dhcp_dos_logger.service" | sudo tee -a $GENERATED_SERVICE_FILE + # Copy dhcp client configuration template and create an initial configuration sudo cp files/dhcp/dhclient.conf.j2 $FILESYSTEM_ROOT_USR_SHARE_SONIC_TEMPLATES/ j2 files/dhcp/dhclient.conf.j2 | sudo tee $FILESYSTEM_ROOT/etc/dhcp/dhclient.conf diff --git a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py index a25140593763..e4f2018e7860 100644 --- a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py +++ b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py @@ -34,7 +34,7 @@ def handler(): while True: for port in drop_pkts.keys(): try: - output = subprocess.run(["tc", "-s", "qdisc", "show", "dev", str(port), "handle", "ffff:"], shell=False, capture_output=True) + output = subprocess.run(["tc -s qdisc show dev {} handle ffff:".format(str(port))], shell=True, capture_output=True) if output.returncode == 0: # Check for successful execution match = re.search(r'dropped (\d+)', output.stdout) if match: diff --git a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.service b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.service index 181a76323e5f..6839e94ae64c 100644 --- a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.service +++ b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.service @@ -8,7 +8,7 @@ After=network.target [Service] Type=simple -ExecStart= /workspaces/sonic-buildimage/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py +ExecStart= /usr/bin/dhcp_dos_logger.py [Install] WantedBy=sonic.target From 72de51b75ae18e82b6ef7144ec7050ed53dad307 Mon Sep 17 00:00:00 2001 From: Asad Raza Date: Fri, 31 May 2024 18:04:51 +0500 Subject: [PATCH 10/12] Fix for semgrep --- files/image_config/dhcp_dos_logger/dhcp_dos_logger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py index e4f2018e7860..96d42e7c70c9 100644 --- a/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py +++ b/files/image_config/dhcp_dos_logger/dhcp_dos_logger.py @@ -34,7 +34,7 @@ def handler(): while True: for port in drop_pkts.keys(): try: - output = subprocess.run(["tc -s qdisc show dev {} handle ffff:".format(str(port))], shell=True, capture_output=True) + output = subprocess.run(["tc", "-s", "qdisc", "show", "dev", str(port), "handle", "ffff:"], capture_output=True) if output.returncode == 0: # Check for successful execution match = re.search(r'dropped (\d+)', output.stdout) if match: From bd7d46478347929f6441a5abf29809865b21e30e Mon Sep 17 00:00:00 2001 From: muhammadalihussnain Date: Fri, 25 Oct 2024 14:58:22 +0500 Subject: [PATCH 11/12] added test_for_logger --- .../dhcp_dos_logger/test_dhcp_dos_logger.py | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 files/image_config/dhcp_dos_logger/test_dhcp_dos_logger.py diff --git a/files/image_config/dhcp_dos_logger/test_dhcp_dos_logger.py b/files/image_config/dhcp_dos_logger/test_dhcp_dos_logger.py new file mode 100644 index 000000000000..5eb71d397b2d --- /dev/null +++ b/files/image_config/dhcp_dos_logger/test_dhcp_dos_logger.py @@ -0,0 +1,65 @@ +import unittest +from unittest.mock import patch, MagicMock +import sys +import os +import subprocess +import re + +# Import the dhcp_dos_logger module directly +sys.path.append(os.path.join(os.path.dirname(__file__), '../../files/image_config/dhcp_dos_logger')) +import dhcp_dos_logger + + +class TestDhcpDosLogger(unittest.TestCase): + + @patch('dhcp_dos_logger.ConfigDBConnector') + @patch('dhcp_dos_logger.subprocess.run') + def test_handler_logging_packet_drops(self, mock_subprocess_run, mock_config_db): + """ + Test if handler logs packet drop events correctly + """ + + # Setup mocks for ConfigDB and ports + mock_config_db.return_value.get_table.return_value = {'Ethernet0': None, 'Ethernet1': None} + dhcp_dos_logger.drop_pkts = {'Ethernet0': 0, 'Ethernet1': 0} + + # Simulate output from subprocess for dropped packets on Ethernet0 and Ethernet1 + mock_subprocess_run.return_value.returncode = 0 + mock_subprocess_run.return_value.stdout = b'qdisc ffff: dev Ethernet0 root refcnt 2 limit 1000p dropped 5' + + # Patch the logger to capture log output + with patch.object(dhcp_dos_logger.logger, 'log_warning') as mock_log_warning: + dhcp_dos_logger.handler() + # Check if the warning log is called correctly when packets drop + mock_log_warning.assert_any_call("Port Ethernet0: Current DHCP drop counter is 5") + + @patch('dhcp_dos_logger.ConfigDBConnector') + @patch('dhcp_dos_logger.subprocess.run') + def test_handler_no_drops_logged(self, mock_subprocess_run, mock_config_db): + """ + Test if handler does not log when no new packet drops occur + """ + mock_config_db.return_value.get_table.return_value = {'Ethernet0': None} + dhcp_dos_logger.drop_pkts = {'Ethernet0': 5} + + # Simulate output showing no additional packet drops on Ethernet0 + mock_subprocess_run.return_value.returncode = 0 + mock_subprocess_run.return_value.stdout = b'qdisc ffff: dev Ethernet0 root refcnt 2 limit 1000p dropped 5' + + with patch.object(dhcp_dos_logger.logger, 'log_warning') as mock_log_warning: + dhcp_dos_logger.handler() + # Check that the log warning is NOT called because drop count did not increase + mock_log_warning.assert_not_called() + + @patch('dhcp_dos_logger.time.sleep', return_value=None) + def test_main_handler_runs(self, mock_sleep): + """ + Test that the main function runs the handler + """ + with patch('dhcp_dos_logger.handler') as mock_handler: + dhcp_dos_logger.main() + mock_handler.assert_called_once() + + +if __name__ == "__main__": + unittest.main() From d2e852c539512464b1e843d409bafea678ec077c Mon Sep 17 00:00:00 2001 From: muhammadalihussnain Date: Mon, 28 Oct 2024 11:20:12 +0500 Subject: [PATCH 12/12] removed the test-file --- .../dhcp_dos_logger/test_dhcp_dos_logger.py | 65 ------------------- 1 file changed, 65 deletions(-) delete mode 100644 files/image_config/dhcp_dos_logger/test_dhcp_dos_logger.py diff --git a/files/image_config/dhcp_dos_logger/test_dhcp_dos_logger.py b/files/image_config/dhcp_dos_logger/test_dhcp_dos_logger.py deleted file mode 100644 index 5eb71d397b2d..000000000000 --- a/files/image_config/dhcp_dos_logger/test_dhcp_dos_logger.py +++ /dev/null @@ -1,65 +0,0 @@ -import unittest -from unittest.mock import patch, MagicMock -import sys -import os -import subprocess -import re - -# Import the dhcp_dos_logger module directly -sys.path.append(os.path.join(os.path.dirname(__file__), '../../files/image_config/dhcp_dos_logger')) -import dhcp_dos_logger - - -class TestDhcpDosLogger(unittest.TestCase): - - @patch('dhcp_dos_logger.ConfigDBConnector') - @patch('dhcp_dos_logger.subprocess.run') - def test_handler_logging_packet_drops(self, mock_subprocess_run, mock_config_db): - """ - Test if handler logs packet drop events correctly - """ - - # Setup mocks for ConfigDB and ports - mock_config_db.return_value.get_table.return_value = {'Ethernet0': None, 'Ethernet1': None} - dhcp_dos_logger.drop_pkts = {'Ethernet0': 0, 'Ethernet1': 0} - - # Simulate output from subprocess for dropped packets on Ethernet0 and Ethernet1 - mock_subprocess_run.return_value.returncode = 0 - mock_subprocess_run.return_value.stdout = b'qdisc ffff: dev Ethernet0 root refcnt 2 limit 1000p dropped 5' - - # Patch the logger to capture log output - with patch.object(dhcp_dos_logger.logger, 'log_warning') as mock_log_warning: - dhcp_dos_logger.handler() - # Check if the warning log is called correctly when packets drop - mock_log_warning.assert_any_call("Port Ethernet0: Current DHCP drop counter is 5") - - @patch('dhcp_dos_logger.ConfigDBConnector') - @patch('dhcp_dos_logger.subprocess.run') - def test_handler_no_drops_logged(self, mock_subprocess_run, mock_config_db): - """ - Test if handler does not log when no new packet drops occur - """ - mock_config_db.return_value.get_table.return_value = {'Ethernet0': None} - dhcp_dos_logger.drop_pkts = {'Ethernet0': 5} - - # Simulate output showing no additional packet drops on Ethernet0 - mock_subprocess_run.return_value.returncode = 0 - mock_subprocess_run.return_value.stdout = b'qdisc ffff: dev Ethernet0 root refcnt 2 limit 1000p dropped 5' - - with patch.object(dhcp_dos_logger.logger, 'log_warning') as mock_log_warning: - dhcp_dos_logger.handler() - # Check that the log warning is NOT called because drop count did not increase - mock_log_warning.assert_not_called() - - @patch('dhcp_dos_logger.time.sleep', return_value=None) - def test_main_handler_runs(self, mock_sleep): - """ - Test that the main function runs the handler - """ - with patch('dhcp_dos_logger.handler') as mock_handler: - dhcp_dos_logger.main() - mock_handler.assert_called_once() - - -if __name__ == "__main__": - unittest.main()