Skip to content

Commit 5d8dced

Browse files
committed
Add heartbeat for sub_devices rospogrigio#194
1 parent 4b0282a commit 5d8dced

File tree

2 files changed

+86
-17
lines changed

2 files changed

+86
-17
lines changed

custom_components/localtuya/coordinator.py

+1
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ def _new_entity_handler(entity_id):
256256
self.debug(f"Success: connected to {host}", force=True)
257257

258258
if self._sub_devices:
259+
self._interface.start_sub_devices_heartbeat()
259260
for subdevice in self._sub_devices.values():
260261
self._hass.async_create_task(subdevice.async_connect())
261262

custom_components/localtuya/core/pytuya/__init__.py

+85-17
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,11 @@ class DecodeError(Exception):
177177
SESS_KEY_NEG_START,
178178
SESS_KEY_NEG_RESP,
179179
SESS_KEY_NEG_FINISH,
180+
LAN_EXT_STREAM,
180181
]
181182

182183
HEARTBEAT_INTERVAL = 10
184+
HEARTBEAT_SUB_DEVICES_INTERVAL = 30
183185

184186
# DPS that are known to be safe to use with update_dps (0x12) command
185187
UPDATE_DPS_WHITELIST = [18, 19, 20] # Socket (Wi-Fi)
@@ -216,6 +218,7 @@ class DecodeError(Exception):
216218
CONTROL_NEW: {"command": {"devId": "", "uid": "", "t": "", "cid": ""}},
217219
DP_QUERY_NEW: {"command": {"devId": "", "uid": "", "t": "", "cid": ""}},
218220
UPDATEDPS: {"command": {"dpId": [18, 19, 20], "cid": ""}},
221+
LAN_EXT_STREAM: {"command": {"reqType": "", "data": {}}},
219222
},
220223
# Special Case Device "0d" - Some of these devices
221224
# Require the 0d command as the DP_QUERY status request and the list of
@@ -592,7 +595,9 @@ def abort(self):
592595
async def wait_for(self, seqno, cmd, timeout=5):
593596
"""Wait for response to a sequence number to be received and return it."""
594597
if seqno in self.listeners:
595-
raise Exception(f"listener exists for {seqno}")
598+
self.debug(f"listener exists for {seqno}")
599+
if seqno == self.HEARTBEAT_SEQNO:
600+
raise Exception(f"listener exists for {seqno}")
596601

597602
self.debug("Command %d waiting for seq. number %d", cmd, seqno)
598603
self.listeners[seqno] = asyncio.Semaphore(0)
@@ -603,8 +608,9 @@ async def wait_for(self, seqno, cmd, timeout=5):
603608
"Command %d timed out waiting for sequence number %d", cmd, seqno
604609
)
605610
del self.listeners[seqno]
606-
raise
607-
611+
raise TimeoutError(
612+
f"Command {cmd} timed out waiting for sequence number {seqno}"
613+
)
608614
return self.listeners.pop(seqno)
609615

610616
def add_data(self, data):
@@ -653,11 +659,12 @@ def add_data(self, data):
653659
def _dispatch(self, msg):
654660
"""Dispatch a message to someone that is listening."""
655661

656-
self.debug("Dispatching message CMD %r %s", msg.cmd, msg)
662+
self.debug("Dispatching message CMD %r %s", msg.cmd, msg, force=True)
657663

658664
if msg.seqno in self.listeners:
659665
self.debug("Dispatching sequence number %d", msg.seqno)
660666
self._release_listener(msg.seqno, msg)
667+
661668
if msg.cmd == HEART_BEAT:
662669
self.debug("Got heartbeat response")
663670
self._release_listener(self.HEARTBEAT_SEQNO, msg)
@@ -674,9 +681,12 @@ def _dispatch(self, msg):
674681
else:
675682
self.debug("Got status update")
676683
self.callback_status_update(msg)
684+
elif msg.cmd == LAN_EXT_STREAM and msg.payload:
685+
self.debug(f"Got Sub-devices status update")
686+
self.callback_status_update(msg)
677687
else:
678-
if msg.cmd == CONTROL_NEW:
679-
self.debug("Got ACK message for command %d: will ignore it", msg.cmd)
688+
if msg.cmd == CONTROL_NEW or not msg.payload:
689+
self.debug("Got ACK message for command %d: will ignore it %s", msg.cmd)
680690
else:
681691
self.debug(
682692
"Got message type %d for unknown listener %d: %s",
@@ -769,6 +779,7 @@ def __init__(
769779
self.on_connected = on_connected
770780
self.heartbeater = None
771781
self.dps_cache = {}
782+
self.sub_devices_states = {} # {"Online": [cid,...], "offline": [cid...]}
772783
self.local_nonce = b"0123456789abcdef" # not-so-random random key
773784
self.remote_nonce = b""
774785
self.dps_whitelist = UPDATE_DPS_WHITELIST
@@ -809,6 +820,22 @@ def _status_update(msg):
809820
decoded_message: dict = self._decode_payload(msg.payload)
810821
cid = None
811822

823+
# Handle sub-devices states update.
824+
if msg.cmd == LAN_EXT_STREAM:
825+
self.debug(f"Sub-Devices States Update: {decoded_message}")
826+
if (data := decoded_message.get("data")) and isinstance(data, dict):
827+
self.sub_devices_states.update(data)
828+
listener = self.listener and self.listener()
829+
if listener is None:
830+
return
831+
832+
on_devices = data.get("online", [])
833+
off_devices = data.get("offline", [])
834+
for cid, device in listener._sub_devices.items():
835+
if cid in off_devices or cid not in on_devices:
836+
device.disconnected()
837+
return
838+
812839
if "dps" not in decoded_message:
813840
return
814841

@@ -883,9 +910,28 @@ async def heartbeat_loop():
883910
# Prevent duplicates heartbeat task
884911
self.heartbeater = self.loop.create_task(heartbeat_loop())
885912

913+
def start_sub_devices_heartbeat(self):
914+
"""Update the states of subdevices every 30sec. this function only be called once."""
915+
916+
async def heartbeat_loop():
917+
"""Continuously send heart beat updates."""
918+
self.debug("Start a heartbeat for sub-devices")
919+
# This will break if main "heartbeat" stopped
920+
while self.heartbeater:
921+
try:
922+
await self.subdevices_query()
923+
await asyncio.sleep(HEARTBEAT_SUB_DEVICES_INTERVAL)
924+
except (Exception, asyncio.CancelledError) as ex:
925+
self.debug(f"Sub-devices heartbeat stopped due to: {ex}")
926+
break
927+
928+
if self.heartbeater:
929+
# Prevent duplicates heartbeat task
930+
self.loop.create_task(heartbeat_loop())
931+
886932
def data_received(self, data):
887933
"""Received data from device."""
888-
# self.debug("received data=%r", binascii.hexlify(data))
934+
# self.debug("received data=%r", binascii.hexlify(data), force=True)
889935
self.dispatcher.add_data(data)
890936

891937
def connection_lost(self, exc):
@@ -960,7 +1006,7 @@ async def exchange_quick(self, payload, recv_retries):
9601006
)
9611007
return None
9621008

963-
async def exchange(self, command, dps=None, nodeID=None, delay=True):
1009+
async def exchange(self, command, dps=None, nodeID=None, delay=True, payload=None):
9641010
"""Send and receive a message, returning response from device."""
9651011
if self.version >= 3.4 and self.real_local_key == self.local_key:
9661012
self.debug("3.4 or 3.5 device: negotiating a new session key")
@@ -969,7 +1015,7 @@ async def exchange(self, command, dps=None, nodeID=None, delay=True):
9691015
self.debug(
9701016
"Sending command %s (device type: %s) DPS: %s", command, self.dev_type, dps
9711017
)
972-
payload = self._generate_payload(command, dps, nodeId=nodeID)
1018+
payload = payload or self._generate_payload(command, dps, nodeId=nodeID)
9731019
real_cmd = payload.cmd
9741020
dev_type = self.dev_type
9751021
# self.debug("Exchange: payload %r %r", payload.cmd, payload.payload)
@@ -994,7 +1040,7 @@ async def exchange(self, command, dps=None, nodeID=None, delay=True):
9941040
if real_cmd in [HEART_BEAT, CONTROL, CONTROL_NEW] and len(msg.payload) == 0:
9951041
# device may send messages with empty payload in response
9961042
# to a HEART_BEAT or CONTROL or CONTROL_NEW command: consider them an ACK
997-
self.debug("ACK received for command %d: ignoring it", real_cmd)
1043+
self.debug(f"ACK received for command {real_cmd}: ignoring: {msg.seqno}")
9981044
return None
9991045
payload = self._decode_payload(msg.payload)
10001046

@@ -1075,6 +1121,15 @@ async def set_dps(self, dps, cid=None):
10751121
"""Set values for a set of datapoints."""
10761122
return await self.exchange(CONTROL, dps, nodeID=cid)
10771123

1124+
async def subdevices_query(self):
1125+
"""Request a list of sub-devices and their status."""
1126+
# Return payload: {"online": [cid1, ...], "offline": [cid2, ...]}
1127+
payload = self._generate_payload(
1128+
LAN_EXT_STREAM, rawData={"cids": []}, reqType="subdev_online_stat_query"
1129+
)
1130+
1131+
return await self.exchange(command=LAN_EXT_STREAM, payload=payload)
1132+
10781133
async def detect_available_dps(self, cid=None):
10791134
"""Return which datapoints are supported by the device."""
10801135
# type_0d devices need a sort of bruteforce querying in order to detect the
@@ -1182,12 +1237,14 @@ def _decode_payload(self, payload):
11821237
try:
11831238
json_payload = json.loads(payload)
11841239
except Exception as ex:
1240+
json_payload = self.error_json(ERR_JSON, payload)
1241+
11851242
if "devid not" in payload: # DeviceID Not found.
11861243
raise ValueError(f"DeviceID [{self.id}] Not found")
1187-
else:
1188-
raise DecodeError(
1189-
f"[{self.id}]: could not decrypt data: wrong local_key? (exception: {ex}, payload: {payload})"
1190-
)
1244+
# else:
1245+
# raise DecodeError(
1246+
# f"[{self.id}]: could not decrypt data: wrong local_key? (exception: {ex}, payload: {payload})"
1247+
# )
11911248
# json_payload = self.error_json(ERR_JSON, payload)
11921249

11931250
# v3.4 stuffs it into {"data":{"dps":{"1":true}}, ...}
@@ -1341,7 +1398,15 @@ def _encode_message(self, msg):
13411398
return buffer
13421399

13431400
def _generate_payload(
1344-
self, command, data=None, gwId=None, devId=None, uid=None, nodeId=None
1401+
self,
1402+
command,
1403+
data=None,
1404+
gwId=None,
1405+
devId=None,
1406+
uid=None,
1407+
nodeId=None,
1408+
rawData=None,
1409+
reqType=None,
13451410
):
13461411
"""
13471412
Generate the payload to send.
@@ -1429,8 +1494,9 @@ def deepcopy_dict(_dict: dict):
14291494
json_data["t"] = int(time.time())
14301495
else:
14311496
json_data["t"] = str(int(time.time()))
1432-
1433-
if data is not None:
1497+
if rawData is not None and "data" in json_data:
1498+
json_data["data"] = rawData
1499+
elif data is not None:
14341500
if "dpId" in json_data:
14351501
json_data["dpId"] = data
14361502
elif "data" in json_data:
@@ -1439,6 +1505,8 @@ def deepcopy_dict(_dict: dict):
14391505
json_data["dps"] = data
14401506
elif self.dev_type == "type_0d" and command == DP_QUERY:
14411507
json_data["dps"] = self.dps_to_request
1508+
if reqType and "reqType" in json_data:
1509+
json_data["reqType"] = reqType
14421510

14431511
if json_data == "":
14441512
payload = ""

0 commit comments

Comments
 (0)