Skip to content

Commit df54cb8

Browse files
authored
Add support for router firmware (#85)
* Initial router protocol * add router types to const * Integrate router firmware throughout * Update README * Fix version parsing for GA build string * Add router to flasher default * fix use of asyncio_timeout * Fix ruff lint
1 parent 2b8568b commit df54cb8

File tree

6 files changed

+146
-3
lines changed

6 files changed

+146
-3
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ Options:
1717
--bootloader-baudrate NUMBERS [default: 115200]
1818
--cpc-baudrate NUMBERS [default: 460800, 115200, 230400]
1919
--ezsp-baudrate NUMBERS [default: 115200]
20-
--spinel-baudrate NUMBERS [default: 460800]
21-
--probe-method TEXT [default: bootloader, cpc, ezsp, spinel]
20+
--router-baudrate NUMBERS [default: 115200]
21+
--spinel-baudrate NUMBERS [default: 460800]
22+
--probe-method TEXT [default: bootloader, cpc, ezsp, spinel,
23+
router]
2224
--bootloader-reset [yellow|ihost|slzb07|sonoff]
2325
--help Show this message and exit.
2426

universal_silabs_flasher/common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ class VersionComponent:
181181

182182
@functools.total_ordering
183183
class Version:
184-
_SEPARATORS = {".", "-", "/", "_", " build "}
184+
_SEPARATORS = {".", "-", "/", "_", " build ", " GA build "}
185185
_SEPARATORS_REGEX = re.compile(
186186
"(" + "|".join(re.escape(s) for s in _SEPARATORS) + ")"
187187
)

universal_silabs_flasher/const.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
class FirmwareImageType(enum.Enum):
55
ZIGBEE_NCP = "zigbee_ncp"
6+
ZIGBEE_ROUTER = "zigbee_router"
67
OPENTHREAD_RCP = "openthread_rcp"
78
ZWAVE_NCP = "zwave_ncp"
89
BOOTLOADER = "bootloader"
@@ -26,13 +27,15 @@ class ApplicationType(enum.Enum):
2627
CPC = "cpc"
2728
EZSP = "ezsp"
2829
SPINEL = "spinel"
30+
ROUTER = "router"
2931

3032

3133
FW_IMAGE_TYPE_TO_APPLICATION_TYPE = {
3234
FirmwareImageType.ZIGBEE_NCP: ApplicationType.EZSP,
3335
FirmwareImageType.MULTIPAN: ApplicationType.CPC,
3436
FirmwareImageType.OPENTHREAD_RCP: ApplicationType.SPINEL,
3537
FirmwareImageType.BOOTLOADER: ApplicationType.GECKO_BOOTLOADER,
38+
FirmwareImageType.ZIGBEE_ROUTER: ApplicationType.ROUTER,
3639
}
3740

3841

@@ -41,6 +44,7 @@ class ApplicationType(enum.Enum):
4144
ApplicationType.CPC: [460800, 115200, 230400],
4245
ApplicationType.EZSP: [115200],
4346
ApplicationType.SPINEL: [460800],
47+
ApplicationType.ROUTER: [115200],
4448
}
4549

4650

universal_silabs_flasher/flash.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,12 @@ def convert(self, value: tuple | str, param: click.Parameter, ctx: click.Context
123123
type=CommaSeparatedNumbers(),
124124
show_default=True,
125125
)
126+
@click.option(
127+
"--router-baudrate",
128+
default=DEFAULT_BAUDRATES[ApplicationType.ROUTER],
129+
type=CommaSeparatedNumbers(),
130+
show_default=True,
131+
)
126132
@click.option(
127133
"--spinel-baudrate",
128134
default=DEFAULT_BAUDRATES[ApplicationType.SPINEL],
@@ -149,6 +155,7 @@ def main(
149155
bootloader_baudrate: list[int],
150156
cpc_baudrate: list[int],
151157
ezsp_baudrate: list[int],
158+
router_baudrate: list[int],
152159
spinel_baudrate: list[int],
153160
probe_method: list[ApplicationType],
154161
bootloader_reset: str | None,
@@ -190,6 +197,7 @@ def main(
190197
ApplicationType.GECKO_BOOTLOADER: bootloader_baudrate,
191198
ApplicationType.CPC: cpc_baudrate,
192199
ApplicationType.EZSP: ezsp_baudrate,
200+
ApplicationType.ROUTER: router_baudrate,
193201
ApplicationType.SPINEL: spinel_baudrate,
194202
},
195203
probe_methods=probe_method,
@@ -341,6 +349,8 @@ async def flash(
341349

342350
if flasher.app_type == ApplicationType.EZSP:
343351
running_image_type = FirmwareImageType.ZIGBEE_NCP
352+
elif flasher.app_type == ApplicationType.ROUTER:
353+
running_image_type = FirmwareImageType.ZIGBEE_ROUTER
344354
elif flasher.app_type == ApplicationType.SPINEL:
345355
running_image_type = FirmwareImageType.OPENTHREAD_RCP
346356
elif flasher.app_type == ApplicationType.CPC:

universal_silabs_flasher/flasher.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from .firmware import FirmwareImage
2525
from .gecko_bootloader import GeckoBootloaderProtocol, NoFirmwareError
2626
from .gpio import find_gpiochip_by_label, send_gpio_pattern
27+
from .router import RouterProtocol
2728
from .spinel import SpinelProtocol
2829
from .xmodemcrc import BLOCK_SIZE as XMODEM_BLOCK_SIZE
2930

@@ -48,6 +49,7 @@ def __init__(
4849
ApplicationType.GECKO_BOOTLOADER,
4950
ApplicationType.CPC,
5051
ApplicationType.EZSP,
52+
ApplicationType.ROUTER,
5153
ApplicationType.SPINEL,
5254
),
5355
device: str,
@@ -103,6 +105,9 @@ def _connect_cpc(self, baudrate: int):
103105
def _connect_ezsp(self, baudrate: int):
104106
return connect_ezsp(self._device, baudrate)
105107

108+
def _connect_router(self, baudrate: int):
109+
return connect_protocol(self._device, baudrate, RouterProtocol)
110+
106111
def _connect_spinel(self, baudrate: int):
107112
return connect_protocol(self._device, baudrate, SpinelProtocol)
108113

@@ -150,6 +155,16 @@ async def probe_ezsp(self, baudrate: int) -> ProbeResult:
150155
continue_probing=False,
151156
)
152157

158+
async def probe_router(self, baudrate: int) -> ProbeResult:
159+
async with self._connect_router(baudrate) as router:
160+
version = await router.probe()
161+
162+
return ProbeResult(
163+
version=version,
164+
baudrate=baudrate,
165+
continue_probing=False,
166+
)
167+
153168
async def probe_spinel(self, baudrate: int) -> ProbeResult:
154169
async with self._connect_spinel(baudrate) as spinel:
155170
version = await spinel.probe()
@@ -194,6 +209,7 @@ async def probe_app_type(
194209
ApplicationType.CPC: self.probe_cpc,
195210
ApplicationType.EZSP: self.probe_ezsp,
196211
ApplicationType.SPINEL: self.probe_spinel,
212+
ApplicationType.ROUTER: self.probe_router,
197213
}
198214

199215
for probe_method, baudrate in (
@@ -266,6 +282,10 @@ async def enter_bootloader(self) -> None:
266282
async with self._connect_spinel(self.app_baudrate) as spinel:
267283
async with asyncio_timeout(PROBE_TIMEOUT):
268284
await spinel.enter_bootloader()
285+
elif self.app_type is ApplicationType.ROUTER:
286+
async with self._connect_router(self.app_baudrate) as router:
287+
async with asyncio_timeout(PROBE_TIMEOUT):
288+
await router.enter_bootloader()
269289
elif self.app_type is ApplicationType.EZSP:
270290
async with self._connect_ezsp(self.app_baudrate) as ezsp:
271291
try:

universal_silabs_flasher/router.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import enum
5+
import logging
6+
import re
7+
8+
from zigpy.serial import SerialProtocol
9+
10+
from .common import PROBE_TIMEOUT, StateMachine, Version, asyncio_timeout
11+
12+
_LOGGER = logging.getLogger(__name__)
13+
14+
ROUTER_INFO_REGEX = re.compile(rb"stack ver\. \[(?P<version>.*?)\]\r\n")
15+
16+
17+
class State(str, enum.Enum):
18+
STARTUP = "startup"
19+
BOOTWAIT = "bootwait"
20+
INFO = "info"
21+
READY = "ready"
22+
23+
24+
class RouterCommand(bytes, enum.Enum):
25+
INFO = b"version\r\n"
26+
BL_REBOOT = b"bootloader reboot\r\n"
27+
28+
29+
class RouterProtocol(SerialProtocol):
30+
def __init__(self) -> None:
31+
super().__init__()
32+
self._state_machine = StateMachine(
33+
states=list(State),
34+
initial=State.STARTUP,
35+
)
36+
self._version: str | None = None
37+
38+
async def probe(self) -> Version:
39+
"""Attempt to communicate with the router."""
40+
async with asyncio_timeout(PROBE_TIMEOUT):
41+
return await self.router_info()
42+
43+
async def router_info(self) -> Version:
44+
"""Get the router version."""
45+
await self.activate_prompt()
46+
self._state_machine.state = State.INFO
47+
self.send_data(RouterCommand.INFO)
48+
49+
await self._state_machine.wait_for_state(State.READY)
50+
51+
assert self._version is not None
52+
return Version(self._version)
53+
54+
async def activate_prompt(self) -> None:
55+
"""Send enter key to activate CLI prompt."""
56+
if self._state_machine.state == State.STARTUP:
57+
await asyncio.sleep(0.5)
58+
self.send_data(b"\r\n")
59+
await self._state_machine.wait_for_state(State.READY)
60+
61+
def send_data(self, data: bytes) -> None:
62+
assert self._transport is not None
63+
_LOGGER.debug("Sending data %s", data)
64+
self._transport.write(data)
65+
66+
def data_received(self, data: bytes) -> None:
67+
super().data_received(data)
68+
69+
while self._buffer:
70+
_LOGGER.debug("Parsing %s: %r", self._state_machine.state, self._buffer)
71+
if self._state_machine.state == State.STARTUP:
72+
if b"\n>" not in self._buffer:
73+
return
74+
75+
self._buffer.clear()
76+
self._state_machine.state = State.READY
77+
78+
if self._state_machine.state == State.INFO:
79+
match = ROUTER_INFO_REGEX.search(self._buffer)
80+
81+
if match is None:
82+
return
83+
84+
self._version = match.group("version").decode("ascii")
85+
_LOGGER.debug("Detected version string %r", self._version)
86+
87+
self._buffer.clear()
88+
self._state_machine.state = State.READY
89+
90+
elif self._state_machine.state == State.BOOTWAIT:
91+
if b"Gecko Bootloader" not in self._buffer:
92+
return
93+
94+
_LOGGER.debug("Bootloader started")
95+
96+
self._buffer.clear()
97+
self._state_machine.state = State.READY
98+
99+
elif self._state_machine.state == State.READY:
100+
self._buffer.clear()
101+
102+
async def enter_bootloader(self) -> None:
103+
await self.activate_prompt()
104+
self._state_machine.state = State.BOOTWAIT
105+
106+
self.send_data(RouterCommand.BL_REBOOT)
107+
await self._state_machine.wait_for_state(State.READY)

0 commit comments

Comments
 (0)