6
6
# ├── [BROKER] connect()
7
7
# ├── [BROKER] disconnect()
8
8
# ├── [BROKER] publish()
9
- # ├── [BROKER] on_connect()
10
- # ├── [BROKER] on_message
11
9
# ├── [BROKER] start_listen()
12
10
# ├── [BROKER] stop_listen()
13
11
# └── [BROKER] listen()
14
12
15
13
import time
16
14
import json
15
+ import uuid
17
16
from datetime import datetime
18
17
import paho .mqtt .client as mqtt
19
18
@@ -30,12 +29,12 @@ def connect(self):
30
29
31
30
self .client = mqtt .Client ()
32
31
self .client .username_pw_set (
33
- username = self .state .token [' token' ][ ' unencoded' ][ ' bot' ],
34
- password = self .state .token [' token' ][ ' encoded' ]
32
+ username = self .state .token [" token" ][ " unencoded" ][ " bot" ],
33
+ password = self .state .token [" token" ][ " encoded" ]
35
34
)
36
35
37
36
self .client .connect (
38
- self .state .token [' token' ][ ' unencoded' ][ ' mqtt' ],
37
+ self .state .token [" token" ][ " unencoded" ][ " mqtt" ],
39
38
port = 1883 ,
40
39
keepalive = 60
41
40
)
@@ -63,7 +62,7 @@ def wrap_message(self, message, priority=None):
63
62
}
64
63
65
64
if priority is not None :
66
- rpc [' args' ][ ' priority' ] = priority
65
+ rpc [" args" ][ " priority" ] = priority
67
66
68
67
return rpc
69
68
@@ -73,80 +72,118 @@ def publish(self, message):
73
72
if self .client is None :
74
73
self .connect ()
75
74
76
- if message ["kind" ] != "rpc_request" :
77
- message = self .wrap_message (message )
75
+ rpc = message
76
+ if rpc ["kind" ] != "rpc_request" :
77
+ rpc = self .wrap_message (rpc )
78
78
79
- device_id_str = self .state .token ["token" ]["unencoded" ]["bot" ]
80
- topic = f"bot/{ device_id_str } /from_clients"
81
- if not self .state .dry_run :
82
- self .client .publish (topic , payload = json .dumps (message ))
83
- self .state .print_status (description = f"Publishing to { topic } :" )
84
- self .state .print_status (endpoint_json = message , update_only = True )
79
+ if rpc ["args" ]["label" ] == "" :
80
+ rpc ["args" ]["label" ] = uuid .uuid4 ().hex if not self .state .test_env else "test"
81
+
82
+ self .state .print_status (description = "Publishing to 'from_clients':" )
83
+ self .state .print_status (endpoint_json = rpc , update_only = True )
85
84
if self .state .dry_run :
86
85
self .state .print_status (description = "Sending disabled, message not sent." , update_only = True )
86
+ else :
87
+ self .listen ("from_device" , publish_payload = rpc )
87
88
88
- def on_connect (self , _client , _userdata , _flags , _rc , channel ):
89
- """Callback function when connection to message broker is successful."""
90
-
91
- self .client .subscribe (
92
- f"bot/{ self .state .token ['token' ]['unencoded' ]['bot' ]} /{ channel } " )
93
-
94
- self .state .print_status (description = f"Connected to message broker channel { channel } " )
95
-
96
- def on_message (self , _client , _userdata , msg , channel ):
97
- """Callback function when message received from message broker."""
98
-
99
- self .state .last_messages [channel ] = json .loads (msg .payload )
89
+ response = self .state .last_messages .get ("from_device" )
90
+ if response is not None :
91
+ if response ["kind" ] == "rpc_ok" :
92
+ self .state .print_status (description = "Success response received." , update_only = True )
93
+ self .state .error = None
94
+ else :
95
+ self .state .print_status (description = "Error response received." , update_only = True )
96
+ self .state .error = "RPC error response received."
100
97
101
- self .state .print_status ( endpoint_json = json . loads ( msg . payload ), description = f"TOPIC: { msg . topic } ( { datetime . now (). strftime ( '%Y-%m-%d %H:%M:%S' ) } ) \n " )
98
+ self .state .last_published = rpc
102
99
103
100
def start_listen (self , channel = "#" ):
104
101
"""Establish persistent subscription to message broker channels."""
105
102
106
103
if self .client is None :
107
104
self .connect ()
108
105
109
- def on_connect ( client , userdata , flags , rc ):
110
- """Wrap on_connect to pass channel argument."""
111
- self . on_connect ( client , userdata , flags , rc , channel )
106
+ # Set on_message callback
107
+ def on_message ( _client , _userdata , msg ):
108
+ """on_message callback"""
112
109
113
- def on_message (client , userdata , msg ):
114
- """Wrap on_message to pass channel argument."""
115
- self .on_message (client , userdata , msg , channel )
110
+ self .state .last_messages [channel ] = json .loads (msg .payload )
111
+
112
+
113
+ self .state .print_status (description = "" , update_only = True )
114
+ timestamp = datetime .now ().strftime ("%Y-%m-%d %H:%M:%S" )
115
+ self .state .print_status (
116
+ endpoint_json = json .loads (msg .payload ),
117
+ description = f"TOPIC: { msg .topic } ({ timestamp } )\n " )
116
118
117
- self .client .on_connect = on_connect
118
119
self .client .on_message = on_message
119
120
121
+ # Subscribe to channel
122
+ device_id_str = self .state .token ["token" ]["unencoded" ]["bot" ]
123
+ self .client .subscribe (f"bot/{ device_id_str } /{ channel } " )
124
+ self .state .print_status (description = f"Connected to message broker channel '{ channel } '" )
125
+
126
+ # Start listening
120
127
self .client .loop_start ()
121
- self .state .print_status (description = f"Now listening to message broker channel { channel } ." )
128
+ self .state .print_status (description = f"Now listening to message broker channel ' { channel } ' ." )
122
129
123
130
def stop_listen (self ):
124
131
"""End subscription to all message broker channels."""
125
132
126
133
self .client .loop_stop ()
127
- self .client .disconnect ()
128
134
129
135
self .state .print_status (description = "Stopped listening to all message broker channels." )
130
136
131
- def listen (self , duration , channel ):
137
+ def listen (self , channel , duration = None , publish_payload = None ):
132
138
"""Listen to a message broker channel for the provided duration in seconds."""
133
- self .state .print_status (description = f"Listening to message broker for { duration } seconds..." )
139
+ # Prepare parameters
140
+ duration_seconds = duration or self .state .broker_listen_duration
141
+ message = (publish_payload or {}).get ("body" , [{}])[0 ]
142
+ if message .get ("kind" ) == "wait" :
143
+ duration_seconds += message ["args" ]["milliseconds" ] / 1000
144
+ publish = publish_payload is not None
145
+ label = None
146
+ if publish and publish_payload ["args" ]["label" ] != "" :
147
+ label = publish_payload ["args" ]["label" ]
148
+
149
+ # Print status message
150
+ channel_str = f" channel '{ channel } '" if channel != "#" else ""
151
+ duration_str = f" for { duration_seconds } seconds"
152
+ label_str = f" for label '{ label } '" if label is not None else ""
153
+ description = f"Listening to message broker{ channel_str } { duration_str } { label_str } ..."
154
+ self .state .print_status (description = description )
155
+
156
+ # Start listening
134
157
start_time = datetime .now ()
135
158
self .start_listen (channel )
136
159
if not self .state .test_env :
137
160
self .state .last_messages [channel ] = None
138
- while (datetime .now () - start_time ).seconds < duration :
161
+ if publish :
162
+ time .sleep (0.1 ) # wait for start_listen to be ready
163
+ device_id_str = self .state .token ["token" ]["unencoded" ]["bot" ]
164
+ publish_topic = f"bot/{ device_id_str } /from_clients"
165
+ self .client .publish (publish_topic , payload = json .dumps (publish_payload ))
166
+ self .state .print_status (update_only = True , description = "" , end = "" )
167
+ while (datetime .now () - start_time ).seconds < duration_seconds :
139
168
self .state .print_status (update_only = True , description = "." , end = "" )
140
169
time .sleep (0.25 )
141
- if self .state .last_messages .get (channel ) is not None :
170
+ last_message = self .state .last_messages .get (channel )
171
+ if last_message is not None :
172
+ # If a label is provided, verify the label matches
173
+ if label is not None and last_message ["args" ]["label" ] != label :
174
+ self .state .last_messages [channel ] = None
175
+ continue
142
176
seconds = (datetime .now () - start_time ).seconds
177
+ self .state .print_status (description = "" , update_only = True )
143
178
self .state .print_status (
144
179
description = f"Message received after { seconds } seconds" ,
145
180
update_only = True )
146
181
break
147
182
if self .state .last_messages .get (channel ) is None :
183
+ self .state .print_status (description = "" , update_only = True )
148
184
self .state .print_status (
149
- description = f"Did not receive message after { duration } seconds" ,
185
+ description = f"Did not receive message after { duration_seconds } seconds" ,
150
186
update_only = True )
187
+ self .state .error = "Timed out waiting for RPC response."
151
188
152
189
self .stop_listen ()
0 commit comments