Skip to content

Commit 4b19d1d

Browse files
committed
Ignore all methods except Close and Close-OK when channel/connection is closing
1 parent 02e1400 commit 4b19d1d

File tree

5 files changed

+187
-4
lines changed

5 files changed

+187
-4
lines changed

amqp/channel.py

+20
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@
3131
consumer_tag=%r exchange=%r routing_key=%r.\
3232
"""
3333

34+
IGNORED_METHOD_DURING_CHANNEL_CLOSE = """\
35+
Received method %s during closing channel. This method will be ignored\
36+
"""
37+
3438

3539
class VDeprecationWarning(DeprecationWarning):
3640
pass
@@ -108,6 +112,7 @@ def __init__(self, connection,
108112

109113
super(Channel, self).__init__(connection, channel_id)
110114

115+
self.is_closing = False
111116
self.is_open = False
112117
self.active = True # Flow control
113118
self.returned_messages = Queue()
@@ -159,6 +164,21 @@ def collect(self):
159164
self.events.clear()
160165
self.no_ack_consumers.clear()
161166

167+
def dispatch_method(self, method_sig, payload, content):
168+
if self.is_closing and method_sig not in (
169+
spec.Channel.Close, spec.Channel.CloseOk
170+
):
171+
# When channel.close() was called we must ignore all methods except
172+
# Channel.close and Channel.CloseOk
173+
AMQP_LOGGER.warning(
174+
IGNORED_METHOD_DURING_CHANNEL_CLOSE, method_sig
175+
)
176+
return
177+
else:
178+
return super(Channel, self).dispatch_method(
179+
method_sig, payload, content
180+
)
181+
162182
def _do_revive(self):
163183
self.is_open = False
164184
self.open()

amqp/connection.py

+22-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ class SSLError(Exception): # noqa
3939
Start from server, version: %d.%d, properties: %s, mechanisms: %s, locales: %s
4040
""".strip()
4141

42+
IGNORED_METHOD_DURING_CONN_CLOSE = """\
43+
Received method %s during closing connection. This method will be ignored\
44+
"""
45+
4246
__all__ = ['Connection']
4347

4448
AMQP_LOGGER = logging.getLogger('amqp')
@@ -233,6 +237,7 @@ def __init__(self, host='localhost:5672', userid='guest', password='guest',
233237
self.frame_handler_cls = frame_handler
234238
self.frame_writer_cls = frame_writer
235239

240+
self.is_closing = False
236241
self._handshake_complete = False
237242

238243
self.channels = {}
@@ -292,6 +297,21 @@ def _setup_listeners(self):
292297
spec.Connection.CloseOk: self._on_close_ok,
293298
})
294299

300+
def dispatch_method(self, method_sig, payload, content):
301+
if self.is_closing and method_sig not in (
302+
spec.Connection.Close, spec.Connection.CloseOk
303+
):
304+
# When Connection.close() was called we must ignore
305+
# all methods except Connection.close and Connection.CloseOk
306+
AMQP_LOGGER.warning(
307+
IGNORED_METHOD_DURING_CONN_CLOSE, method_sig
308+
)
309+
return
310+
else:
311+
return super(Connection, self).dispatch_method(
312+
method_sig, payload, content
313+
)
314+
295315
def connect(self, callback=None):
296316
# Let the transport.py module setup the actual
297317
# socket connection to the broker.
@@ -576,10 +596,11 @@ def close(self, reply_code=0, reply_text='', method_sig=(0, 0),
576596
wait=spec.Connection.CloseOk,
577597
)
578598
except (OSError, IOError, SSLError):
579-
self.is_closing = False
580599
# close connection
581600
self.collect()
582601
raise
602+
finally:
603+
self.is_closing = False
583604

584605
def _on_close(self, reply_code, reply_text, class_id, method_id):
585606
"""Request a connection close.

t/integration/test_integration.py

+68
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,74 @@ def test_connection_methods(self, method, callback):
413413
conn.drain_events(0)
414414
callback_mock.assert_called_once()
415415

416+
def test_channel_ignore_methods_during_close(self):
417+
# Test checking that py-amqp will discard any received methods
418+
# except Close and Close-OK after sending Channel.Close method
419+
# to server.
420+
frame_writer_cls_mock = Mock()
421+
conn = Connection(frame_writer=frame_writer_cls_mock)
422+
consumer_tag = 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg'
423+
with patch.object(conn, 'Transport') as transport_mock:
424+
handshake(conn, transport_mock)
425+
426+
channel_id = 1
427+
transport_mock().read_frame.side_effect = [
428+
# Inject Open Handshake
429+
build_frame_type_1(
430+
spec.Channel.OpenOk,
431+
channel=channel_id,
432+
args=(1, False),
433+
arg_format='Lb'
434+
),
435+
# Inject basic-deliver response
436+
build_frame_type_1(
437+
spec.Basic.Deliver,
438+
channel=1,
439+
arg_format='sLbss',
440+
args=(
441+
# consumer-tag, delivery-tag, redelivered,
442+
consumer_tag, 1, False,
443+
# exchange-name, routing-key
444+
'foo_exchange', 'routing-key'
445+
)
446+
),
447+
build_frame_type_2(
448+
channel=1,
449+
body_len=12,
450+
properties=b'0\x00\x00\x00\x00\x00\x01'
451+
),
452+
build_frame_type_3(
453+
channel=1,
454+
body=b'Hello World!'
455+
),
456+
# Inject close method
457+
build_frame_type_1(
458+
spec.Channel.CloseOk,
459+
channel=channel_id
460+
),
461+
]
462+
463+
frame_writer_mock = frame_writer_cls_mock()
464+
frame_writer_mock.reset_mock()
465+
466+
with patch('amqp.Channel._on_basic_deliver') as on_deliver_mock:
467+
ch = conn.channel(channel_id=channel_id)
468+
ch.close()
469+
on_deliver_mock.assert_not_called()
470+
frame_writer_mock.assert_has_calls(
471+
[
472+
call(
473+
1, 1, spec.Channel.Open, dumps('s', ('',)),
474+
None
475+
),
476+
call(
477+
1, 1, spec.Channel.Close, dumps('BsBB', (0, '', 0, 0)),
478+
None
479+
)
480+
]
481+
)
482+
assert ch.is_open is False
483+
416484
def test_channel_open_close(self):
417485
# Test checking opening and closing channel
418486
frame_writer_cls_mock = Mock()

t/unit/test_channel.py

+31-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from amqp import spec
99
from amqp.basic_message import Message
10-
from amqp.channel import Channel
10+
from amqp.channel import Channel, IGNORED_METHOD_DURING_CHANNEL_CLOSE
1111
from amqp.exceptions import (ConsumerCancelled, MessageNacked, NotFound,
1212
RecoverableConnectionError)
1313
from amqp.platform import pack
@@ -26,6 +26,9 @@ def setup_conn(self):
2626
self.c = Channel(self.conn, 1)
2727
self.c.send_method = Mock(name='send_method')
2828

29+
def test_init_is_closing(self):
30+
assert self.c.is_closing is False
31+
2932
def test_init_confirm_enabled(self):
3033
self.conn.confirm_publish = True
3134
c = Channel(self.conn, 2)
@@ -88,6 +91,7 @@ def test_close(self):
8891
(30, 'text', spec.Queue.Declare[0], spec.Queue.Declare[1]),
8992
wait=spec.Channel.CloseOk,
9093
)
94+
assert self.c.is_closing is False
9195
assert self.c.connection is None
9296

9397
def test_on_close(self):
@@ -598,3 +602,29 @@ def test_on_basic_nack(self):
598602
self.c.events['basic_nack'].add(callback)
599603
self.c._on_basic_nack(123, True)
600604
callback.assert_called_with(123, True)
605+
606+
@pytest.mark.parametrize(
607+
"method",
608+
(spec.Channel.Close, spec.Channel.CloseOk, spec.Basic.Deliver)
609+
)
610+
def test_dispatch_method_close(self, method):
611+
with patch('amqp.channel.AbstractChannel.dispatch_method') as dm_mock:
612+
self.c.dispatch_method(method, 'aa', 'bb')
613+
dm_mock.assert_called_once_with(method, 'aa', 'bb')
614+
615+
@pytest.mark.parametrize(
616+
"method",
617+
(spec.Channel.Close, spec.Channel.CloseOk, spec.Basic.Deliver)
618+
)
619+
def test_dispatch_method_closing_channel(self, method, caplog):
620+
self.c.is_closing = True
621+
with patch('amqp.channel.AbstractChannel.dispatch_method') as dm_mock:
622+
self.c.dispatch_method(method, 'aa', 'bb')
623+
if method in (spec.Channel.Close, spec.Channel.CloseOk):
624+
dm_mock.assert_called_once_with(method, 'aa', 'bb')
625+
else:
626+
dm_mock.assert_not_called()
627+
assert caplog.records[0].msg == \
628+
IGNORED_METHOD_DURING_CHANNEL_CLOSE
629+
assert caplog.records[0].args[0] == method
630+
assert caplog.records[0].levelname == 'WARNING'

t/unit/test_connection.py

+46-2
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
import pytest
77

88
from amqp import Connection, spec
9-
from amqp.connection import SSLError
9+
from amqp.connection import SSLError, IGNORED_METHOD_DURING_CONN_CLOSE
1010
from amqp.exceptions import ConnectionError, NotFound, ResourceError
1111
from amqp.five import items
1212
from amqp.sasl import AMQPLAIN, EXTERNAL, GSSAPI, PLAIN, SASL
1313
from amqp.transport import TCPTransport
14-
from case import ContextMock, Mock, call, patch
14+
from case import ContextMock, Mock, call, patch, sentinel
1515

1616

1717
class test_Connection:
@@ -491,3 +491,47 @@ def test_heartbeat_tick(self):
491491
def test_server_capabilities(self):
492492
self.conn.server_properties['capabilities'] = {'foo': 1}
493493
assert self.conn.server_capabilities == {'foo': 1}
494+
495+
@pytest.mark.parametrize(
496+
"method",
497+
(
498+
spec.Connection.Close,
499+
spec.Connection.CloseOk,
500+
spec.Connection.TuneOk
501+
)
502+
)
503+
def test_dispatch_method(self, method):
504+
mocked_method = 'amqp.connection.AbstractChannel.dispatch_method'
505+
with patch(mocked_method) as dm_mock:
506+
self.conn.dispatch_method(
507+
method, sentinel.PAYLOAD, sentinel.CONTENT
508+
)
509+
dm_mock.assert_called_once_with(
510+
method, sentinel.PAYLOAD, sentinel.CONTENT
511+
)
512+
513+
@pytest.mark.parametrize(
514+
"method",
515+
(
516+
spec.Connection.Close,
517+
spec.Connection.CloseOk,
518+
spec.Connection.TuneOk
519+
)
520+
)
521+
def test_dispatch_method_closing_connection(self, method, caplog):
522+
mocked_method = 'amqp.connection.AbstractChannel.dispatch_method'
523+
self.conn.is_closing = True
524+
with patch(mocked_method) as dm_mock:
525+
self.conn.dispatch_method(
526+
method, sentinel.PAYLOAD, sentinel.CONTENT
527+
)
528+
if method in (spec.Connection.Close, spec.Connection.CloseOk):
529+
dm_mock.assert_called_once_with(
530+
method, sentinel.PAYLOAD, sentinel.CONTENT
531+
)
532+
else:
533+
dm_mock.assert_not_called()
534+
assert caplog.records[0].msg == \
535+
IGNORED_METHOD_DURING_CONN_CLOSE
536+
assert caplog.records[0].args[0] == method
537+
assert caplog.records[0].levelname == 'WARNING'

0 commit comments

Comments
 (0)