Skip to content

Commit 8c1748f

Browse files
committed
Changed type for bytes converter to parse bytes as ascii numbers, not utf8 string, added hexMode to bytes mqtt uplink converter
1 parent c7279ef commit 8c1748f

File tree

3 files changed

+42
-24
lines changed

3 files changed

+42
-24
lines changed

thingsboard_gateway/connectors/mqtt/bytes_mqtt_uplink_converter.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515

1616
class BytesMqttUplinkConverter(MqttUplinkConverter):
17+
SUPPORTS_BYTES_PAYLOAD = True
1718
def __init__(self, config, logger):
1819
self.__config = config.get('converter')
1920
self._log = logger
@@ -48,8 +49,8 @@ def convert(self, topic, data):
4849
try:
4950
for datatype in datatypes:
5051
for datatype_config in self.__config.get(datatype, []):
51-
key = self.parse_data(datatype_config['key'], data)
52-
value = self.parse_data(datatype_config['value'], data)
52+
key = self.parse_data(datatype_config['key'], data, hex_mode=datatype_config.get('hexMode', False))
53+
value = self.parse_data(datatype_config['value'], data, hex_mode=datatype_config.get('hexMode', False))
5354
datapoint_key = TBUtility.convert_key_to_datapoint_key(key,
5455
device_report_strategy,
5556
datatype_config,
@@ -74,23 +75,25 @@ def convert(self, topic, data):
7475
return converted_data
7576

7677
@staticmethod
77-
def parse_data(expression, data):
78+
def parse_data(expression: str, data: list, hex_mode: bool = False) -> str:
7879
expression_arr = findall(r'\[\S[0-9:]*]', expression)
7980
converted_data = expression
8081

8182
for exp in expression_arr:
8283
indexes = exp[1:-1].split(':')
83-
8484
data_to_replace = ''
85+
8586
if len(indexes) == 2:
86-
from_index, to_index = indexes
87-
concat_arr = data[
88-
int(from_index) if from_index != '' else None:int(
89-
to_index) if to_index != '' else None]
90-
for sub_item in concat_arr:
91-
data_to_replace += str(sub_item)
87+
from_index = int(indexes[0]) if indexes[0] else None
88+
to_index = int(indexes[1]) if indexes[1] else None
89+
slice_data = data[from_index:to_index]
90+
else:
91+
slice_data = [data[int(indexes[0])]]
92+
93+
if hex_mode:
94+
data_to_replace = ''.join(f'{int(b):02x}' for b in slice_data)
9295
else:
93-
data_to_replace += str(data[int(indexes[0])])
96+
data_to_replace = ''.join(str(b) for b in slice_data)
9497

9598
converted_data = converted_data.replace(exp, data_to_replace)
9699

thingsboard_gateway/connectors/mqtt/mqtt_connector.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,8 @@ def _on_subscribe(self, _, __, mid, granted_qos, *args):
544544

545545
def put_data_to_convert(self, converter, message, content) -> bool:
546546
if not self.__msg_queue.full():
547+
if not hasattr(converter, 'SUPPORTS_BYTES_PAYLOAD'):
548+
content = TBUtility.decode(content)
547549
self.__msg_queue.put((converter.convert, message.topic, content), True, 100)
548550
return True
549551
return False
@@ -615,7 +617,7 @@ def _process_on_message(self):
615617
client, userdata, message = self._on_message_queue.get_nowait()
616618

617619
self.statistics['MessagesReceived'] += 1
618-
content = TBUtility.decode(message)
620+
content = None
619621

620622
# Check if message topic exists in mappings "i.e., I'm posting telemetry/attributes" -------------------
621623
topic_handlers = [regex for regex in self.__mapping_sub_topics if fullmatch(regex, message.topic)]
@@ -632,7 +634,7 @@ def _process_on_message(self):
632634
available_converters = self.__mapping_sub_topics[topic]
633635
for converter in available_converters:
634636
try:
635-
request_handled = self.put_data_to_convert(converter, message, content)
637+
request_handled = self.put_data_to_convert(converter, message, message.payload)
636638
except Exception as e:
637639
self.__log.exception(e)
638640

@@ -651,6 +653,8 @@ def _process_on_message(self):
651653
fullmatch(regex, message.topic)]
652654

653655
if topic_handlers:
656+
if content is None:
657+
content = TBUtility.decode(message)
654658
for topic in topic_handlers:
655659
handler = self.__connect_requests_sub_topics[topic]
656660

@@ -677,6 +681,8 @@ def _process_on_message(self):
677681
topic_handlers = [regex for regex in self.__disconnect_requests_sub_topics if
678682
fullmatch(regex, message.topic)]
679683
if topic_handlers:
684+
if content is None:
685+
content = TBUtility.decode(message)
680686
for topic in topic_handlers:
681687
handler = self.__disconnect_requests_sub_topics[topic]
682688

@@ -705,6 +711,8 @@ def _process_on_message(self):
705711
topic_handlers = [regex for regex in self.__attribute_requests_sub_topics if
706712
fullmatch(regex, message.topic)]
707713
if topic_handlers:
714+
if content is None:
715+
content = TBUtility.decode(message)
708716
try:
709717
for topic in topic_handlers:
710718
handler = self.__attribute_requests_sub_topics[topic]

thingsboard_gateway/tb_utility/tb_utility.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,25 @@ class TBUtility:
5050
@staticmethod
5151
def decode(message, return_raw=False):
5252
if return_raw:
53-
return message.payload
54-
try:
55-
if isinstance(message.payload, bytes):
56-
content = loads(message.payload.decode("utf-8", "ignore"))
57-
else:
58-
content = loads(message.payload)
59-
except JSONDecodeError:
53+
return getattr(message, 'payload', message)
54+
55+
payload = message if isinstance(message, bytes) else getattr(message, 'payload', None)
56+
if payload is None:
57+
return message
58+
59+
if isinstance(payload, bytes):
60+
try:
61+
return loads(payload.decode("utf-8", "ignore"))
62+
except JSONDecodeError:
63+
try:
64+
return payload.decode("utf-8")
65+
except UnicodeDecodeError:
66+
return payload
67+
else:
6068
try:
61-
content = message.payload.decode("utf-8")
62-
except UnicodeDecodeError:
63-
content = message.payload
64-
return content
69+
return loads(payload)
70+
except JSONDecodeError:
71+
return payload
6572

6673
@staticmethod
6774
def validate_converted_data(data: Union[dict, 'ConvertedData']):

0 commit comments

Comments
 (0)