66
66
_empty_correlation_id = b"\0 " * 16
67
67
68
68
69
+ from . import core # circular import...
70
+
71
+
69
72
class SendingMessage :
70
73
"""Wire protocol message that will be sent."""
74
+
71
75
def __init__ (self , msgtype , flags , seq , serializer_id , payload , annotations = None ):
72
76
self .type = msgtype
73
77
self .seq = seq
@@ -82,7 +86,6 @@ def __init__(self, msgtype, flags, seq, serializer_id, payload, annotations=None
82
86
total_size = len (payload ) + annotations_size
83
87
if total_size > config .MAX_MESSAGE_SIZE :
84
88
raise errors .ProtocolError ("message too large ({:d}, max={:d})" .format (total_size , config .MAX_MESSAGE_SIZE ))
85
- from . import core # XXX circular
86
89
if core .current_context .correlation_id :
87
90
flags |= FLAGS_CORR_ID
88
91
self .corr_id = core .current_context .correlation_id .bytes
@@ -178,12 +181,13 @@ def log_wiredata(logger, text, msg):
178
181
(text , msg .type , msg .flags , msg .serializer_id , msg .seq , num_anns , corr_id , bytes (msg .data )))
179
182
180
183
181
- def recv_stub (connection , accepted_msgtypes = None ): # @todo decouple i/o from actual protocol logic
184
+ def recv_stub (connection , accepted_msgtypes = None ):
182
185
"""
183
186
Receives a pyro message from a given connection.
184
187
Accepts the given message types (None=any, or pass a sequence).
185
188
Also reads annotation chunks and the actual payload data.
186
189
"""
190
+ # TODO decouple i/o from actual protocol logic, so that the protocol can be easily unit tested
187
191
header = connection .recv (6 ) # 'PYRO' + 2 bytes protocol version
188
192
ReceivingMessage .validate (header )
189
193
header += connection .recv (_header_size - 6 )
0 commit comments