diff --git a/config/main.py b/config/main.py index b1bf599a83..e200bd27f6 100644 --- a/config/main.py +++ b/config/main.py @@ -667,7 +667,7 @@ def is_storm_control_supported(storm_type, namespace): supported = state_db.get(state_db.STATE_DB, entry_name,"supported") return supported -#API to configure the PORT_STORM_CONTROL table +#API to configure the PORT_STORM_CONTROL table def storm_control_set_entry(port_name, kbps, storm_type, namespace): if storm_control_interface_validate(port_name) is False: @@ -692,7 +692,7 @@ def storm_control_set_entry(port_name, kbps, storm_type, namespace): return True -#API to remove an entry from PORT_STORM_CONTROL table +#API to remove an entry from PORT_STORM_CONTROL table def storm_control_delete_entry(port_name, storm_type): if storm_control_interface_validate(port_name) is False: @@ -4633,6 +4633,67 @@ def transceiver(ctx): """SFP transceiver configuration""" pass +# +# 'frequency' subcommand ('config interface transceiver frequency ...') +# +@transceiver.command() +@click.pass_context +@click.argument('interface_name', metavar='', required=True) +@click.argument('frequency', metavar='', required=True, type=int) +def frequency(ctx, interface_name, frequency): + """Set transciever (only for 400G-ZR) frequency""" + # Get the config_db connector + config_db = ctx.obj['config_db'] + + if clicommon.get_interface_naming_mode() == "alias": + interface_name = interface_alias_to_name(config_db, interface_name) + if interface_name is None: + ctx.fail("'interface_name' is None!") + + if interface_name_is_valid(config_db, interface_name) is False: + ctx.fail("Interface name is invalid. Please enter a valid interface name!!") + + log.log_info("{} Setting transceiver frequency {} GHz".format(interface_name, frequency)) + + if ctx.obj['namespace'] is DEFAULT_NAMESPACE: + command = "portconfig -p {} -F {}".format(interface_name, frequency) + else: + command = "portconfig -p {} -F {} -n {}".format(interface_name, frequency, ctx.obj['namespace']) + + clicommon.run_command(command) + + +# +# 'tx_power' subcommand ('config interface transceiver tx_power ...') +# For negative float use:- +# config interface transceiver tx_power Ethernet0 -- -27.4" +# +@transceiver.command('tx_power') +@click.pass_context +@click.argument('interface_name', metavar='', required=True) +@click.argument('tx-power', metavar='', required=True, type=float) +def tx_power(ctx, interface_name, tx_power): + """Set transciever (only for 400G-ZR) Tx laser power""" + # Get the config_db connector + config_db = ctx.obj['config_db'] + + if clicommon.get_interface_naming_mode() == "alias": + interface_name = interface_alias_to_name(config_db, interface_name) + if interface_name is None: + ctx.fail("'interface_name' is None!") + + if interface_name_is_valid(config_db, interface_name) is False: + ctx.fail("Interface name is invalid. Please enter a valid interface name!!") + + log.log_info("{} Setting transceiver power {} dBm".format(interface_name, tx_power)) + + if ctx.obj['namespace'] is DEFAULT_NAMESPACE: + command = "portconfig -p {} -P {}".format(interface_name, tx_power) + else: + command = "portconfig -p {} -P {} -n {}".format(interface_name, tx_power, ctx.obj['namespace']) + + clicommon.run_command(command) + # # 'lpmode' subcommand ('config interface transceiver lpmode ...') # diff --git a/scripts/portconfig b/scripts/portconfig index dcb98cb403..63bb463868 100755 --- a/scripts/portconfig +++ b/scripts/portconfig @@ -22,9 +22,12 @@ optional arguments: -t --interface-type port interface type -T --adv-interface-types port advertised interface types -lt --link-training port link training mode + -P --tx-power 400G ZR modulet target Tx output power (dBm) + -F --laser-freq 400G ZR module 75GHz grid frequency (GHz) """ import os import sys +import decimal import argparse # mock the redis for unit test purposes # @@ -51,6 +54,8 @@ PORT_ADV_SPEEDS_CONFIG_FIELD_NAME = "adv_speeds" PORT_INTERFACE_TYPE_CONFIG_FIELD_NAME = "interface_type" PORT_ADV_INTERFACE_TYPES_CONFIG_FIELD_NAME = "adv_interface_types" PORT_LINK_TRAINING_CONFIG_FIELD_NAME = "link_training" +PORT_XCVR_LASER_FREQ_FIELD_NAME = "laser_freq" +PORT_XCVR_TX_POWER_FIELD_NAME = "tx_power" PORT_CHANNEL_TABLE_NAME = "PORTCHANNEL" PORT_CHANNEL_MBR_TABLE_NAME = "PORTCHANNEL_MEMBER" TPID_CONFIG_FIELD_NAME = "tpid" @@ -152,6 +157,14 @@ class portconfig(object): mode = 'on' if mode == 'enabled' else 'off' self.db.mod_entry(PORT_TABLE_NAME, port, {PORT_AUTONEG_CONFIG_FIELD_NAME: mode}) + def set_tx_power(self, port, tx_power): + print("Setting target Tx output power to %s dBm on port %s" % (tx_power, port)) + self.db.mod_entry(PORT_TABLE_NAME, port, {PORT_XCVR_TX_POWER_FIELD_NAME: tx_power}) + + def set_laser_freq(self, port, laser_freq): + print("Setting laser frequency to %s GHz on port %s" % (laser_freq, port)) + self.db.mod_entry(PORT_TABLE_NAME, port, {PORT_XCVR_LASER_FREQ_FIELD_NAME: laser_freq}) + def set_adv_speeds(self, port, adv_speeds): if self.verbose: print("Setting adv_speeds %s on port %s" % (adv_speeds, port)) @@ -280,6 +293,10 @@ def main(): help = 'port advertised interface types', default=None) parser.add_argument('-lt', '--link-training', type = str, required = False, help = 'port link training mode', default=None) + parser.add_argument('-P', '--tx-power', type=float, required=False, + help='Tx output power(dBm)', default=None) + parser.add_argument('-F', '--laser-freq', type=int, required=False, + help='Laser frequency(GHz)', default=None) args = parser.parse_args() # Load database config files @@ -288,7 +305,9 @@ def main(): port = portconfig(args.verbose, args.port, args.namespace) if args.list: port.list_params(args.port) - elif args.speed or args.fec or args.mtu or args.link_training or args.autoneg or args.adv_speeds or args.interface_type or args.adv_interface_types or args.tpid: + elif args.speed or args.fec or args.mtu or args.link_training or args.autoneg or args.adv_speeds or \ + args.interface_type or args.adv_interface_types or args.tpid or \ + args.tx_power or args.laser_freq: if args.speed: port.set_speed(args.port, args.speed) if args.fec: @@ -307,6 +326,17 @@ def main(): port.set_adv_interface_types(args.port, args.adv_interface_types) if args.tpid: port.set_tpid(args.port, args.tpid) + if args.tx_power: + d = decimal.Decimal(str(args.tx_power)) + if d.as_tuple().exponent < -1: + print("Error: tx power must be with single decimal place") + sys.exit(1) + port.set_tx_power(args.port, args.tx_power) + if args.laser_freq: + if args.laser_freq <= 0: + print("Error: Frequency must be > 0") + sys.exit(1) + port.set_laser_freq(args.port, args.laser_freq) else: parser.print_help() sys.exit(1) diff --git a/tests/config_xcvr_test.py b/tests/config_xcvr_test.py new file mode 100644 index 0000000000..1ecc452f52 --- /dev/null +++ b/tests/config_xcvr_test.py @@ -0,0 +1,49 @@ +import click +import config.main as config +import operator +import os +import pytest +import sys + +from click.testing import CliRunner +from utilities_common.db import Db + +test_path = os.path.dirname(os.path.abspath(__file__)) +modules_path = os.path.dirname(test_path) +scripts_path = os.path.join(modules_path, "scripts") +sys.path.insert(0, modules_path) + + +@pytest.fixture(scope='module') +def ctx(scope='module'): + db = Db() + obj = {'config_db':db.cfgdb, 'namespace': ''} + yield obj + + +class TestConfigXcvr(object): + @classmethod + def setup_class(cls): + print("SETUP") + os.environ["PATH"] += os.pathsep + scripts_path + os.environ["UTILITIES_UNIT_TESTING"] = "1" + + def test_config_laser_frequency(self, ctx): + #self.basic_check("link-training", ["Ethernet0", "on"], ctx) + result = self.basic_check("frequency", ["Ethernet0", "191300"], ctx) + assert "Setting laser frequency" in result.output + result = self.basic_check("frequency", ["Ethernet0", "--", "-1"], ctx, op=operator.ne) + assert "Error: Frequency must be > 0" in result.output + + def test_config_tx_power(self, ctx): + result = self.basic_check("tx_power", ["Ethernet0", "11.3"], ctx) + assert "Setting target Tx output power" in result.output + result = self.basic_check("tx_power", ["Ethernet0", "11.34"], ctx, op=operator.ne) + assert "Error: tx power must be with single decimal place" in result.output + + def basic_check(self, command_name, para_list, ctx, op=operator.eq, expect_result=0): + runner = CliRunner() + result = runner.invoke(config.config.commands["interface"].commands["transceiver"].commands[command_name], para_list, obj = ctx) + print(result.output) + assert op(result.exit_code, expect_result) + return result