Skip to content

Commit 6547380

Browse files
committed
Ignore all methods except Close and Close-OK when channel/connection is closing
1 parent 02e1400 commit 6547380

File tree

3 files changed

+96
-1
lines changed

3 files changed

+96
-1
lines changed

amqp/channel.py

+13
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ def __init__(self, connection,
108108

109109
super(Channel, self).__init__(connection, channel_id)
110110

111+
self.is_closing = False
111112
self.is_open = False
112113
self.active = True # Flow control
113114
self.returned_messages = Queue()
@@ -159,6 +160,18 @@ def collect(self):
159160
self.events.clear()
160161
self.no_ack_consumers.clear()
161162

163+
def dispatch_method(self, method_sig, payload, content):
164+
if self.is_closing and method_sig not in (
165+
spec.Channel.Close, spec.Channel.CloseOk
166+
):
167+
# When channel.close() was called we must ignore all methods except
168+
# Channel.close and Channel.CloseOk
169+
return
170+
else:
171+
return super(Channel, self).dispatch_method(
172+
method_sig, payload, content
173+
)
174+
162175
def _do_revive(self):
163176
self.is_open = False
164177
self.open()

amqp/connection.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ def __init__(self, host='localhost:5672', userid='guest', password='guest',
233233
self.frame_handler_cls = frame_handler
234234
self.frame_writer_cls = frame_writer
235235

236+
self.is_closing = False
236237
self._handshake_complete = False
237238

238239
self.channels = {}
@@ -292,6 +293,18 @@ def _setup_listeners(self):
292293
spec.Connection.CloseOk: self._on_close_ok,
293294
})
294295

296+
def dispatch_method(self, method_sig, payload, content):
297+
if self.is_closing and method_sig not in (
298+
spec.Connection.Close, spec.Connection.CloseOk
299+
):
300+
# When Connection.close() was called we must ignore
301+
# all methods except Connection.close and Connection.CloseOk
302+
return
303+
else:
304+
return super(Connection, self).dispatch_method(
305+
method_sig, payload, content
306+
)
307+
295308
def connect(self, callback=None):
296309
# Let the transport.py module setup the actual
297310
# socket connection to the broker.
@@ -576,10 +589,11 @@ def close(self, reply_code=0, reply_text='', method_sig=(0, 0),
576589
wait=spec.Connection.CloseOk,
577590
)
578591
except (OSError, IOError, SSLError):
579-
self.is_closing = False
580592
# close connection
581593
self.collect()
582594
raise
595+
finally:
596+
self.is_closing = False
583597

584598
def _on_close(self, reply_code, reply_text, class_id, method_id):
585599
"""Request a connection close.

t/integration/test_integration.py

+68
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,74 @@ def test_connection_methods(self, method, callback):
413413
conn.drain_events(0)
414414
callback_mock.assert_called_once()
415415

416+
def test_channel_ignore_methods_during_close(self):
417+
# Test checking that py-amqp will discard any received methods
418+
# except Close and Close-OK after sending Channel.Close method
419+
# to server.
420+
frame_writer_cls_mock = Mock()
421+
conn = Connection(frame_writer=frame_writer_cls_mock)
422+
consumer_tag = 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg'
423+
with patch.object(conn, 'Transport') as transport_mock:
424+
handshake(conn, transport_mock)
425+
426+
channel_id = 1
427+
transport_mock().read_frame.side_effect = [
428+
# Inject Open Handshake
429+
build_frame_type_1(
430+
spec.Channel.OpenOk,
431+
channel=channel_id,
432+
args=(1, False),
433+
arg_format='Lb'
434+
),
435+
# Inject basic-deliver response
436+
build_frame_type_1(
437+
spec.Basic.Deliver,
438+
channel=1,
439+
arg_format='sLbss',
440+
args=(
441+
# consumer-tag, delivery-tag, redelivered,
442+
consumer_tag, 1, False,
443+
# exchange-name, routing-key
444+
'foo_exchange', 'routing-key'
445+
)
446+
),
447+
build_frame_type_2(
448+
channel=1,
449+
body_len=12,
450+
properties=b'0\x00\x00\x00\x00\x00\x01'
451+
),
452+
build_frame_type_3(
453+
channel=1,
454+
body=b'Hello World!'
455+
),
456+
# Inject close method
457+
build_frame_type_1(
458+
spec.Channel.CloseOk,
459+
channel=channel_id
460+
),
461+
]
462+
463+
frame_writer_mock = frame_writer_cls_mock()
464+
frame_writer_mock.reset_mock()
465+
466+
with patch('amqp.Channel._on_basic_deliver') as on_deliver_mock:
467+
ch = conn.channel(channel_id=channel_id)
468+
ch.close()
469+
on_deliver_mock.assert_not_called()
470+
frame_writer_mock.assert_has_calls(
471+
[
472+
call(
473+
1, 1, spec.Channel.Open, dumps('s', ('',)),
474+
None
475+
),
476+
call(
477+
1, 1, spec.Channel.Close, dumps('BsBB', (0, '', 0, 0)),
478+
None
479+
)
480+
]
481+
)
482+
assert ch.is_open is False
483+
416484
def test_channel_open_close(self):
417485
# Test checking opening and closing channel
418486
frame_writer_cls_mock = Mock()

0 commit comments

Comments
 (0)