-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrt_harness_discovery.py
63 lines (53 loc) · 2.17 KB
/
rt_harness_discovery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from .harness import Harness
from .interface.error import TestError, Error
from . import log
from .utils import Timeout
import time
simulator_port = 'simulator'
def get_harness_automatic(timeout, retry_time_s=1) -> Harness:
'''
automatic harness connection discovery, polls for harness phone connection on uart
timeout - time we wait for connection
retry_time_s - time between open retries
'''
harness = None
# There is a period of time between the moment the device is detected and when it is ready
# to accept commands. All data sent during this window will be lost, so wait for the time
# defined here before sending anything.
detection_to_readiness_time = 5
log.debug(f'await for port {timeout}s')
with Timeout.limit(seconds=timeout):
while not harness:
try:
harness = Harness.from_detect()
except TestError as e:
if e.get_error_code() == Error.PORT_NOT_FOUND:
time.sleep(retry_time_s)
if harness is not None:
log.debug(f'found port, waiting {detection_to_readiness_time}s for device to be ready')
time.sleep(detection_to_readiness_time)
return harness
def get_harness_by_port_name(port: str, timeout, retry_time_s=1) -> Harness:
'''
if port is named `simulator` then try to open simulator /tmp/purephone_pts_name
else try uart specified
port - port to open either /dev/... or `simulator`
timeout - timeout to try to open port
retry_time_s - time between open retries
'''
assert '/dev' in port or simulator_port in port
if simulator_port in port:
file = None
log.debug(f'await for port {timeout}s')
with Timeout.limit(seconds=timeout):
while not file:
try:
file = open("/tmp/purephone_pts_name", "r")
except FileNotFoundError:
time.sleep(retry_time_s)
port = file.readline()
if port.isascii():
log.debug("found {} entry!".format(port))
else:
raise ValueError("not a valid sim pts entry!")
return Harness(port)