2
2
import json
3
3
import logging
4
4
from os .path import abspath
5
- from typing import List , Dict
5
+ from typing import List , Dict , Any
6
6
import time
7
7
8
8
from arcaflow_plugin_sdk import schema , serialization , jsonschema
9
9
from arcaflow_plugin_kill_pod import kill_pods , wait_for_pods
10
+ from krkn_lib .k8s import KrknKubernetes
11
+ from krkn_lib .k8s .pods_monitor_pool import PodsMonitorPool
12
+
10
13
import kraken .plugins .node_scenarios .vmware_plugin as vmware_plugin
11
14
import kraken .plugins .node_scenarios .ibmcloud_plugin as ibmcloud_plugin
12
15
from kraken .plugins .run_python_plugin import run_python_file
@@ -47,11 +50,14 @@ def __init__(self, steps: List[PluginStep]):
47
50
)
48
51
self .steps_by_id [step .schema .id ] = step
49
52
53
+ def unserialize_scenario (self , file : str ) -> Any :
54
+ return serialization .load_from_file (abspath (file ))
55
+
50
56
def run (self , file : str , kubeconfig_path : str , kraken_config : str ):
51
57
"""
52
58
Run executes a series of steps
53
59
"""
54
- data = serialization . load_from_file (abspath (file ))
60
+ data = self . unserialize_scenario (abspath (file ))
55
61
if not isinstance (data , list ):
56
62
raise Exception (
57
63
"Invalid scenario configuration file: {} expected list, found {}" .format (file , type (data ).__name__ )
@@ -241,18 +247,37 @@ def json_schema(self):
241
247
)
242
248
243
249
244
- def run (scenarios : List [str ], kubeconfig_path : str , kraken_config : str , failed_post_scenarios : List [str ], wait_duration : int , telemetry : KrknTelemetryKubernetes ) -> (List [str ], list [ScenarioTelemetry ]):
250
+ def run (scenarios : List [str ],
251
+ kubeconfig_path : str ,
252
+ kraken_config : str ,
253
+ failed_post_scenarios : List [str ],
254
+ wait_duration : int ,
255
+ telemetry : KrknTelemetryKubernetes ,
256
+ kubecli : KrknKubernetes
257
+ ) -> (List [str ], list [ScenarioTelemetry ]):
258
+
245
259
scenario_telemetries : list [ScenarioTelemetry ] = []
246
260
for scenario in scenarios :
247
261
scenario_telemetry = ScenarioTelemetry ()
248
262
scenario_telemetry .scenario = scenario
249
263
scenario_telemetry .startTimeStamp = time .time ()
250
264
telemetry .set_parameters_base64 (scenario_telemetry , scenario )
251
265
logging .info ('scenario ' + str (scenario ))
266
+ pool = PodsMonitorPool (kubecli )
267
+ kill_scenarios = [kill_scenario for kill_scenario in PLUGINS .unserialize_scenario (scenario ) if kill_scenario ["id" ] == "kill-pods" ]
268
+
252
269
try :
270
+ start_monitoring (pool , kill_scenarios )
253
271
PLUGINS .run (scenario , kubeconfig_path , kraken_config )
272
+ result = pool .join ()
273
+ scenario_telemetry .affected_pods = result
274
+ if result .error :
275
+ raise Exception (f"unrecovered pods: { result .error } " )
276
+
254
277
except Exception as e :
278
+ logging .error (f"scenario exception: { str (e )} " )
255
279
scenario_telemetry .exitStatus = 1
280
+ pool .cancel ()
256
281
failed_post_scenarios .append (scenario )
257
282
log_exception (scenario )
258
283
else :
@@ -263,3 +288,31 @@ def run(scenarios: List[str], kubeconfig_path: str, kraken_config: str, failed_p
263
288
scenario_telemetry .endTimeStamp = time .time ()
264
289
265
290
return failed_post_scenarios , scenario_telemetries
291
+
292
+
293
+ def start_monitoring (pool : PodsMonitorPool , scenarios : list [Any ]):
294
+ for kill_scenario in scenarios :
295
+ recovery_time = kill_scenario ["config" ]["krkn_pod_recovery_time" ]
296
+ if ("namespace_pattern" in kill_scenario ["config" ] and
297
+ "label_selector" in kill_scenario ["config" ]):
298
+ namespace_pattern = kill_scenario ["config" ]["namespace_pattern" ]
299
+ label_selector = kill_scenario ["config" ]["label_selector" ]
300
+ pool .select_and_monitor_by_namespace_pattern_and_label (
301
+ namespace_pattern = namespace_pattern ,
302
+ label_selector = label_selector ,
303
+ max_timeout = recovery_time )
304
+ logging .info (
305
+ f"waiting { recovery_time } seconds for pod recovery, "
306
+ f"pod label selector: { label_selector } namespace pattern: { namespace_pattern } " )
307
+
308
+ elif ("namespace_pattern" in kill_scenario ["config" ] and
309
+ "name_pattern" in kill_scenario ["config" ]):
310
+ namespace_pattern = kill_scenario ["config" ]["namespace_pattern" ]
311
+ name_pattern = kill_scenario ["config" ]["name_pattern" ]
312
+ pool .select_and_monitor_by_name_pattern_and_namespace_pattern (pod_name_pattern = name_pattern ,
313
+ namespace_pattern = namespace_pattern ,
314
+ max_timeout = recovery_time )
315
+ logging .info (f"waiting { recovery_time } seconds for pod recovery, "
316
+ f"pod name pattern: { name_pattern } namespace pattern: { namespace_pattern } " )
317
+ else :
318
+ raise Exception (f"impossible to determine monitor parameters, check { kill_scenario } configuration" )
0 commit comments