-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathtestTypeIperf.py
201 lines (162 loc) · 6.57 KB
/
testTypeIperf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import json
import logging
import task
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Any
from typing import Optional
import tftbase
from task import ClientTask
from task import ServerTask
from task import TaskOperation
from testSettings import TestSettings
from testType import TestTypeHandler
from tftbase import BaseOutput
from tftbase import Bitrate
from tftbase import FlowTestOutput
from tftbase import TestType
logger = logging.getLogger("tft." + __name__)
IPERF_EXE = "iperf3"
IPERF_UDP_OPT = "-u -b 25G"
IPERF_REV_OPT = "-R"
class ResultTcp:
def __init__(self, data: Mapping[str, Any]):
sum_sent: Mapping[str, Any] = data["end"]["sum_sent"]
sum_received: Mapping[str, Any] = data["end"]["sum_received"]
self.transfer_sent = float(sum_sent["bytes"]) / (1024**3)
self.bitrate_sent = float(sum_sent["bits_per_second"]) / 1e9
self.transfer_received = float(sum_received["bytes"]) / (1024**3)
self.bitrate_received = float(sum_received["bits_per_second"]) / 1e9
self.mss = int(data["start"]["tcp_mss_default"])
self.sum_sent_seconds = float(sum_sent["seconds"])
self.sum_received_seconds = float(sum_received["seconds"])
self.bitrate = Bitrate(
tx=float(f"{self.bitrate_sent:.5g}"),
rx=float(f"{self.bitrate_received:.5g}"),
)
def log(self) -> None:
logger.info(
f"\n [ ID] Interval Transfer Bitrate\n"
f" [SENT] 0.00-{self.sum_sent_seconds:.2f} sec {self.transfer_sent:.2f} GBytes {self.bitrate_sent:.2f} Gbits/sec sender\n"
f" [REC] 0.00-{self.sum_received_seconds:.2f} sec {self.transfer_received:.2f} GBytes {self.bitrate_received:.2f} Gbits/sec receiver\n"
f" MSS = {self.mss}"
)
class ResultUdp:
def __init__(self, data: Mapping[str, Any]):
sum_data: Mapping[str, Any] = data["end"]["sum"]
self.total_gigabytes = float(sum_data["bytes"]) / (1024**3)
self.average_gigabitrate = float(sum_data["bits_per_second"]) / 1e9
self.average_jitter = float(sum_data["jitter_ms"])
self.total_lost_packets = float(sum_data["lost_packets"])
self.total_lost_percent = float(sum_data["lost_percent"])
self.bitrate = Bitrate(
tx=float(f"{self.average_gigabitrate:.5g}"),
rx=float(f"{self.average_gigabitrate:.5g}"),
)
def log(self) -> None:
logger.info(
f"\n Total GBytes: {self.total_gigabytes:.4f} GBytes\n"
f" Average Bitrate: {self.average_gigabitrate:.2f} Gbits/s\n"
f" Average Jitter: {self.average_jitter:.9f} ms\n"
f" Total Lost Packets: {self.total_lost_packets}\n"
f" Total Lost Percent: {self.total_lost_percent:.2f}%"
)
def _calculate_gbps(test_type: TestType, result: Mapping[str, Any]) -> Bitrate:
try:
if test_type == TestType.IPERF_TCP:
return ResultTcp(result).bitrate
else:
return ResultUdp(result).bitrate
except Exception:
return Bitrate.NA
@dataclass(frozen=True)
class TestTypeHandlerIperf(TestTypeHandler):
def _create_server_client(self, ts: TestSettings) -> tuple[ServerTask, ClientTask]:
s = IperfServer(ts=ts)
c = IperfClient(ts=ts, server=s)
return (s, c)
def can_run_reverse(self) -> bool:
if self.test_type == TestType.IPERF_TCP:
return True
return False
TestTypeHandler.register_test_type(TestTypeHandlerIperf(TestType.IPERF_TCP))
TestTypeHandler.register_test_type(TestTypeHandlerIperf(TestType.IPERF_UDP))
class IperfServer(task.ServerTask):
def cmd_line_args(self, *, for_template: bool = False) -> list[str]:
if for_template:
extra_args = []
else:
extra_args = ["--one-off", "--json"]
return [
IPERF_EXE,
"-s",
"-p",
f"{self.port}",
*extra_args,
]
def _create_setup_operation_get_cancel_action_cmd(self) -> str:
return f"killall {IPERF_EXE}"
class IperfClient(task.ClientTask):
def _create_task_operation(self) -> TaskOperation:
server_ip = self.get_target_ip()
cmd = (
f"{IPERF_EXE} -c {server_ip} -p {self.port} --json -t {self.get_duration()}"
)
if self.test_type == TestType.IPERF_UDP:
cmd += f" {IPERF_UDP_OPT}"
if self.reverse:
cmd += f" {IPERF_REV_OPT}"
def _thread_action() -> BaseOutput:
self.ts.clmo_barrier.wait()
r = self.run_oc_exec(cmd)
self.ts.event_client_finished.set()
success = True
msg: Optional[str] = None
result: dict[str, Any] = {}
if not r.success:
success = False
msg = f'Command "{cmd}" failed: {r.debug_msg()}'
if success:
try:
result = json.loads(r.out)
except Exception:
success = False
msg = f'Output of "{cmd}" is not valid JSON: {r.debug_msg()}'
if success:
if (
not result
or not isinstance(result, dict)
or not all(isinstance(k, str) for k in result)
):
success = False
msg = f'Output of "{cmd}" contains unexpected data: {r.debug_msg()}'
if success:
if "error" in result:
success = False
msg = f'Output of "{cmd}" contains "error": {r.debug_msg()}'
bitrate_gbps = _calculate_gbps(self.test_type, result)
if success:
if bitrate_gbps == Bitrate.NA:
success = False
msg = f'Output of "{cmd}" does not contain expected data: {r.debug_msg()}'
return FlowTestOutput(
success=success,
msg=msg,
tft_metadata=self.ts.get_test_metadata(),
command=cmd,
result=result,
bitrate_gbps=bitrate_gbps,
)
return TaskOperation(
log_name=self.log_name,
thread_action=_thread_action,
)
def _aggregate_output_log_success(
self,
result: tftbase.AggregatableOutput,
) -> None:
assert isinstance(result, FlowTestOutput)
if self.test_type == TestType.IPERF_TCP:
ResultTcp(result.result).log()
else:
ResultUdp(result.result).log()