8
8
import logging
9
9
import random
10
10
from socket import socket
11
- from typing import List , Optional , Union , cast
11
+ from typing import cast
12
12
13
13
import isotp
14
14
import isotp .tpsock
26
26
ecu_logger = logging .getLogger ("somersault_lazy_ecu" )
27
27
28
28
is_sterile = False
29
- can_channel : Optional [ str ] = None
29
+ can_channel : str | None = None
30
30
somersault_lazy_diag_layer = somersaultdatabase .ecus .somersault_lazy
31
31
32
32
# the raw payload data of the telegrams received by the ECU and by the
33
33
# tester when in sterile mode (unittest without a CAN channel)
34
- sterile_rx_ecu : List [bytes ] = []
35
- sterile_rx_ecu_event : Optional [ asyncio .Event ] = None
36
- sterile_rx_tester : List [bytes ] = []
37
- sterile_rx_tester_event : Optional [ asyncio .Event ] = None
34
+ sterile_rx_ecu : list [bytes ] = []
35
+ sterile_rx_ecu_event : asyncio .Event | None = None
36
+ sterile_rx_tester : list [bytes ] = []
37
+ sterile_rx_tester_event : asyncio .Event | None = None
38
38
39
39
40
- def create_isotp_socket (channel : Optional [str ], rxid : int ,
41
- txid : int ) -> Optional [isotp .tpsock .socket ]:
40
+ def create_isotp_socket (channel : str | None , rxid : int , txid : int ) -> isotp .tpsock .socket | None :
42
41
if is_sterile :
43
42
return None
44
43
@@ -66,7 +65,7 @@ def create_isotp_socket(channel: Optional[str], rxid: int,
66
65
return result_socket
67
66
68
67
69
- async def ecu_send (isotp_socket : Optional [ isotp .tpsock .socket ] , payload : bytes ) -> None :
68
+ async def ecu_send (isotp_socket : isotp .tpsock .socket | None , payload : bytes ) -> None :
70
69
"""
71
70
ECU sends a message, either in "sterile" or in "live" mode.
72
71
"""
@@ -87,7 +86,7 @@ async def ecu_send(isotp_socket: Optional[isotp.tpsock.socket], payload: bytes)
87
86
await loop .sock_sendall (cast (socket , isotp_socket ), payload )
88
87
89
88
90
- async def ecu_recv (isotp_socket : Optional [ isotp .tpsock .socket ] ) -> bytes :
89
+ async def ecu_recv (isotp_socket : isotp .tpsock .socket | None ) -> bytes :
91
90
"""
92
91
ECU receives a message, either in "sterile" or in "live" mode.
93
92
"""
@@ -113,7 +112,7 @@ async def ecu_recv(isotp_socket: Optional[isotp.tpsock.socket]) -> bytes:
113
112
return await loop .sock_recv (cast (socket , isotp_socket ), 4095 )
114
113
115
114
116
- async def tester_send (isotp_socket : Optional [ isotp .tpsock .socket ] , payload : bytes ) -> None :
115
+ async def tester_send (isotp_socket : isotp .tpsock .socket | None , payload : bytes ) -> None :
117
116
"""
118
117
Tester sends a message, either in "sterile" or in "live" mode.
119
118
"""
@@ -133,7 +132,7 @@ async def tester_send(isotp_socket: Optional[isotp.tpsock.socket], payload: byte
133
132
await loop .sock_sendall (cast (socket , isotp_socket ), payload )
134
133
135
134
136
- async def tester_recv (isotp_socket : Optional [ isotp .tpsock .socket ] ) -> bytes :
135
+ async def tester_recv (isotp_socket : isotp .tpsock .socket | None ) -> bytes :
137
136
"""
138
137
Tester receives a message, either in "sterile" or in "live" mode.
139
138
"""
@@ -360,9 +359,9 @@ async def _auto_close_session_task(self) -> None:
360
359
continue
361
360
362
361
363
- async def tester_await_response (isotp_socket : Optional [ isotp .tpsock .socket ] ,
362
+ async def tester_await_response (isotp_socket : isotp .tpsock .socket | None ,
364
363
raw_message : bytes ,
365
- timeout : float = 0.5 ) -> Union [ bytes , ParameterValueDict ] :
364
+ timeout : float = 0.5 ) -> bytes | ParameterValueDict :
366
365
# await the answer from the server (be aware that the maximum
367
366
# length of ISO-TP telegrams over the CAN bus is 4095 bytes)
368
367
raw_response = await tester_recv (isotp_socket )
0 commit comments