|
1 | 1 | import json
|
2 | 2 | import jsonpatch
|
| 3 | +import importlib |
3 | 4 | from jsonpointer import JsonPointer
|
4 | 5 | import sonic_yang
|
5 | 6 | import sonic_yang_ext
|
6 | 7 | import subprocess
|
7 | 8 | import yang as ly
|
8 | 9 | import copy
|
9 | 10 | import re
|
| 11 | +import os |
10 | 12 | from sonic_py_common import logger
|
11 | 13 | from enum import Enum
|
12 | 14 |
|
13 | 15 | YANG_DIR = "/usr/local/yang-models"
|
14 | 16 | SYSLOG_IDENTIFIER = "GenericConfigUpdater"
|
| 17 | +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) |
| 18 | +GCU_FIELD_OP_CONF_FILE = f"{SCRIPT_DIR}/gcu_field_operation_validators.conf.json" |
15 | 19 |
|
16 | 20 | class GenericConfigUpdaterError(Exception):
|
17 | 21 | pass
|
@@ -157,6 +161,37 @@ def validate_field_operation(self, old_config, target_config):
|
157 | 161 | if any(op['op'] == operation and field == op['path'] for op in patch):
|
158 | 162 | raise IllegalPatchOperationError("Given patch operation is invalid. Operation: {} is illegal on field: {}".format(operation, field))
|
159 | 163 |
|
| 164 | + def _invoke_validating_function(cmd): |
| 165 | + # cmd is in the format as <package/module name>.<method name> |
| 166 | + method_name = cmd.split(".")[-1] |
| 167 | + module_name = ".".join(cmd.split(".")[0:-1]) |
| 168 | + if module_name != "generic_config_updater.field_operation_validators" or "validator" not in method_name: |
| 169 | + raise GenericConfigUpdaterError("Attempting to call invalid method {} in module {}. Module must be generic_config_updater.field_operation_validators, and method must be a defined validator".format(method_name, module_name)) |
| 170 | + module = importlib.import_module(module_name, package=None) |
| 171 | + method_to_call = getattr(module, method_name) |
| 172 | + return method_to_call() |
| 173 | + |
| 174 | + if os.path.exists(GCU_FIELD_OP_CONF_FILE): |
| 175 | + with open(GCU_FIELD_OP_CONF_FILE, "r") as s: |
| 176 | + gcu_field_operation_conf = json.load(s) |
| 177 | + else: |
| 178 | + raise GenericConfigUpdaterError("GCU field operation validators config file not found") |
| 179 | + |
| 180 | + for element in patch: |
| 181 | + path = element["path"] |
| 182 | + match = re.search(r'\/([^\/]+)(\/|$)', path) # This matches the table name in the path, eg if path if /PFC_WD/GLOBAL, the match would be PFC_WD |
| 183 | + if match is not None: |
| 184 | + table = match.group(1) |
| 185 | + else: |
| 186 | + raise GenericConfigUpdaterError("Invalid jsonpatch path: {}".format(path)) |
| 187 | + validating_functions= set() |
| 188 | + tables = gcu_field_operation_conf["tables"] |
| 189 | + validating_functions.update(tables.get(table, {}).get("field_operation_validators", [])) |
| 190 | + |
| 191 | + for function in validating_functions: |
| 192 | + if not _invoke_validating_function(function): |
| 193 | + raise IllegalPatchOperationError("Modification of {} table is illegal- validating function {} returned False".format(table, function)) |
| 194 | + |
160 | 195 | def validate_lanes(self, config_db):
|
161 | 196 | if "PORT" not in config_db:
|
162 | 197 | return True, None
|
|
0 commit comments