17
17
from datetime import datetime
18
18
import paho .mqtt .client as mqtt
19
19
20
+
20
21
class BrokerConnect ():
21
22
"""Broker connection class."""
23
+
22
24
def __init__ (self , state ):
23
25
self .state = state
24
26
self .client = None
@@ -50,7 +52,8 @@ def disconnect(self):
50
52
if self .client is not None :
51
53
self .client .loop_stop ()
52
54
self .client .disconnect ()
53
- self .state .print_status (description = "Disconnected from message broker." )
55
+ description = "Disconnected from message broker."
56
+ self .state .print_status (description = description )
54
57
55
58
def wrap_message (self , message , priority = None ):
56
59
"""Wrap message in CeleryScript format."""
@@ -78,22 +81,31 @@ def publish(self, message):
78
81
rpc = self .wrap_message (rpc )
79
82
80
83
if rpc ["args" ]["label" ] == "" :
81
- rpc ["args" ]["label" ] = uuid .uuid4 ().hex if not self .state .test_env else "test"
84
+ if self .state .test_env :
85
+ rpc ["args" ]["label" ] = "test"
86
+ else :
87
+ rpc ["args" ]["label" ] = uuid .uuid4 ().hex
82
88
83
89
self .state .print_status (description = "Publishing to 'from_clients'" )
84
90
self .state .print_status (endpoint_json = rpc , update_only = True )
85
91
if self .state .dry_run :
86
- self .state .print_status (description = "Sending disabled, message not sent." , update_only = True )
92
+ self .state .print_status (
93
+ description = "Sending disabled, message not sent." ,
94
+ update_only = True )
87
95
else :
88
96
self .listen ("from_device" , publish_payload = rpc )
89
97
90
98
response = self .state .last_messages .get ("from_device" , [])
91
99
if len (response ) > 0 :
92
100
if response [- 1 ]["kind" ] == "rpc_ok" :
93
- self .state .print_status (description = "Success response received." , update_only = True )
101
+ self .state .print_status (
102
+ description = "Success response received." ,
103
+ update_only = True )
94
104
self .state .error = None
95
105
else :
96
- self .state .print_status (description = "Error response received." , update_only = True )
106
+ self .state .print_status (
107
+ description = "Error response received." ,
108
+ update_only = True )
97
109
self .state .error = "RPC error response received."
98
110
99
111
self .state .last_published = rpc
@@ -131,8 +143,8 @@ def on_message(_client, _userdata, msg):
131
143
132
144
if diff_only :
133
145
diff = payload
134
- prev_channel_key = path_channel if len (path ) > 0 else channel_key
135
- last_messages = self .state .last_messages .get (prev_channel_key , [])
146
+ key = path_channel if len (path ) > 0 else channel_key
147
+ last_messages = self .state .last_messages .get (key , [])
136
148
if len (last_messages ) > 1 :
137
149
current = last_messages [- 1 ]
138
150
previous = last_messages [- 2 ]
@@ -175,9 +187,15 @@ def stop_listen(self):
175
187
176
188
self .client .loop_stop ()
177
189
178
- self .state .print_status (description = "Stopped listening to all message broker channels." )
190
+ self .state .print_status (
191
+ description = "Stopped listening to all message broker channels." )
179
192
180
- def listen (self , channel = "#" , duration = None , publish_payload = None , stop_count = 1 , message_options = None ):
193
+ def listen (self ,
194
+ channel = "#" ,
195
+ duration = None ,
196
+ publish_payload = None ,
197
+ stop_count = 1 ,
198
+ message_options = None ):
181
199
"""Listen to a message broker channel for the provided duration in seconds."""
182
200
publish = publish_payload is not None
183
201
message = (publish_payload or {}).get ("body" , [{}])[0 ]
@@ -220,14 +238,16 @@ def listen(self, channel="#", duration=None, publish_payload=None, stop_count=1,
220
238
self .start_listen (channel , message_options )
221
239
if not self .state .test_env :
222
240
if channel == "#" :
223
- self .state .last_messages = {"#" : []}
241
+ self .state .last_messages = {"#" : []}
224
242
else :
225
243
self .state .last_messages [channel ] = []
226
244
if publish :
227
- time .sleep (0.1 ) # wait for start_listen to be ready
245
+ time .sleep (0.1 ) # wait for start_listen to be ready
228
246
device_id_str = self .state .token ["token" ]["unencoded" ]["bot" ]
229
- publish_topic = f"bot/{ device_id_str } /from_clients"
230
- self .client .publish (publish_topic , payload = json .dumps (publish_payload ))
247
+ publish_topic = f"bot/{ device_id_str } /from_clients"
248
+ self .client .publish (
249
+ publish_topic ,
250
+ payload = json .dumps (publish_payload ))
231
251
self .state .print_status (update_only = True , description = "" , end = "" )
232
252
while (datetime .now () - start_time ).seconds < duration_seconds :
233
253
self .state .print_status (update_only = True , description = "." , end = "" )
@@ -240,20 +260,26 @@ def listen(self, channel="#", duration=None, publish_payload=None, stop_count=1,
240
260
continue
241
261
if len (last_messages ) > (stop_count - 1 ):
242
262
seconds = (datetime .now () - start_time ).seconds
243
- prefix = "Message" if stop_count == 1 else f"{ stop_count } messages"
263
+ prefix = f"{ stop_count } messages"
264
+ if stop_count == 1 :
265
+ prefix = "Message"
266
+ description = f"{ prefix } received after { seconds } seconds"
244
267
self .state .print_status (
245
- description = f" { prefix } received after { seconds } seconds" ,
268
+ description = description ,
246
269
update_only = True )
247
270
break
248
271
if len (self .state .last_messages .get (channel , [])) == 0 :
249
272
self .state .print_status (description = "" , update_only = True )
273
+ secs = duration_seconds
274
+ description = f"Did not receive message after { secs } seconds"
250
275
self .state .print_status (
251
- description = f"Did not receive message after { duration_seconds } seconds" ,
276
+ description = description ,
252
277
update_only = True )
253
278
self .state .error = "Timed out waiting for RPC response."
254
279
255
280
self .stop_listen ()
256
281
282
+
257
283
def difference (next_state , prev_state ):
258
284
"""Find the difference between two states."""
259
285
is_different = False
@@ -267,7 +293,9 @@ def difference(next_state, prev_state):
267
293
prev_value = prev_state [key ]
268
294
if next_value != prev_value :
269
295
if isinstance (next_value , dict ) and isinstance (prev_value , dict ):
270
- nested_diff , nested_is_different = difference (next_value , prev_value )
296
+ nested_diff , nested_is_different = difference (
297
+ next_value ,
298
+ prev_value )
271
299
if nested_is_different :
272
300
diff [key ] = nested_diff
273
301
is_different = True
0 commit comments