Skip to content

Commit d4c440b

Browse files
authored
fix(pubsub): fix messages delivered multiple times despite a long ACK deadline (#9525)
* fix(pubsub): lease-manage all received messages This is to prevent the messages that are put on hold from unnecessarily timing out too soon, causing the backend to re-send them. * Exclude on hold messages from load calculation Even the messages received that exceed the maximum load (as defined by flow control) must be lease-mananged to avoid unnecessary ACK deadline expirations, but since they are not dispatched (yet) to user callbacks, they should not contribute to the overall load. Without this change, the total load could be overestimated, resulting in an indefinitely paused message stream, and messages not being dispatched to callbacks when they should be. * Use histogram to set default stream ACK deadline With all the messages lease-managed (even those on hold), there is no need to have a fixed default value. * Add warning if internal bytes count is negative This should not happen, but if it does, it is a bug in the StreamingPullManager logic, and we should know about it.
1 parent 0a9665e commit d4c440b

File tree

3 files changed

+112
-80
lines changed

3 files changed

+112
-80
lines changed

pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

+55-46
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,6 @@
5151
_RESUME_THRESHOLD = 0.8
5252
"""The load threshold below which to resume the incoming message stream."""
5353

54-
_DEFAULT_STREAM_ACK_DEADLINE = 60
55-
"""The default message acknowledge deadline in seconds for incoming message stream.
56-
57-
This default deadline is dynamically modified for the messages that are added
58-
to the lease management.
59-
"""
60-
6154

6255
def _maybe_wrap_exception(exception):
6356
"""Wraps a gRPC exception class, if needed."""
@@ -135,9 +128,15 @@ def __init__(
135128
# because the FlowControl limits have been hit.
136129
self._messages_on_hold = queue.Queue()
137130

131+
# the total number of bytes consumed by the messages currently on hold
132+
self._on_hold_bytes = 0
133+
138134
# A lock ensuring that pausing / resuming the consumer are both atomic
139135
# operations that cannot be executed concurrently. Needed for properly
140-
# syncing these operations with the current leaser load.
136+
# syncing these operations with the current leaser load. Additionally,
137+
# the lock is used to protect modifications of internal data that
138+
# affects the load computation, i.e. the count and size of the messages
139+
# currently on hold.
141140
self._pause_resume_lock = threading.Lock()
142141

143142
# The threads created in ``.open()``.
@@ -218,10 +217,18 @@ def load(self):
218217
if self._leaser is None:
219218
return 0.0
220219

220+
# Messages that are temporarily put on hold are not being delivered to
221+
# user's callbacks, thus they should not contribute to the flow control
222+
# load calculation.
223+
# However, since these messages must still be lease-managed to avoid
224+
# unnecessary ACK deadline expirations, their count and total size must
225+
# be subtracted from the leaser's values.
221226
return max(
222227
[
223-
self._leaser.message_count / self._flow_control.max_messages,
224-
self._leaser.bytes / self._flow_control.max_bytes,
228+
(self._leaser.message_count - self._messages_on_hold.qsize())
229+
/ self._flow_control.max_messages,
230+
(self._leaser.bytes - self._on_hold_bytes)
231+
/ self._flow_control.max_bytes,
225232
]
226233
)
227234

@@ -292,13 +299,19 @@ def _maybe_release_messages(self):
292299
except queue.Empty:
293300
break
294301

295-
self.leaser.add(
296-
[requests.LeaseRequest(ack_id=msg.ack_id, byte_size=msg.size)]
297-
)
302+
self._on_hold_bytes -= msg.size
303+
304+
if self._on_hold_bytes < 0:
305+
_LOGGER.warning(
306+
"On hold bytes was unexpectedly negative: %s", self._on_hold_bytes
307+
)
308+
self._on_hold_bytes = 0
309+
298310
_LOGGER.debug(
299-
"Released held message to leaser, scheduling callback for it, "
300-
"still on hold %s.",
311+
"Released held message, scheduling callback for it, "
312+
"still on hold %s (bytes %s).",
301313
self._messages_on_hold.qsize(),
314+
self._on_hold_bytes,
302315
)
303316
self._scheduler.schedule(self._callback, msg)
304317

@@ -392,17 +405,7 @@ def open(self, callback, on_callback_error):
392405
)
393406

394407
# Create the RPC
395-
396-
# We must use a fixed value for the ACK deadline, as we cannot read it
397-
# from the subscription. The latter would require `pubsub.subscriptions.get`
398-
# permission, which is not granted to the default subscriber role
399-
# `roles/pubsub.subscriber`.
400-
# See also https://github.com/googleapis/google-cloud-python/issues/9339
401-
#
402-
# When dynamic lease management is enabled for the "on hold" messages,
403-
# the default stream ACK deadline should again be set based on the
404-
# historic ACK timing data, i.e. `self.ack_histogram.percentile(99)`.
405-
stream_ack_deadline_seconds = _DEFAULT_STREAM_ACK_DEADLINE
408+
stream_ack_deadline_seconds = self.ack_histogram.percentile(99)
406409

407410
get_initial_request = functools.partial(
408411
self._get_initial_request, stream_ack_deadline_seconds
@@ -540,40 +543,46 @@ def _on_response(self, response):
540543
the callback for each message using the executor.
541544
"""
542545
_LOGGER.debug(
543-
"Processing %s received message(s), currenty on hold %s.",
546+
"Processing %s received message(s), currenty on hold %s (bytes %s).",
544547
len(response.received_messages),
545548
self._messages_on_hold.qsize(),
549+
self._on_hold_bytes,
546550
)
547551

552+
# Immediately (i.e. without waiting for the auto lease management)
553+
# modack the messages we received, as this tells the server that we've
554+
# received them.
555+
items = [
556+
requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99))
557+
for message in response.received_messages
558+
]
559+
self._dispatcher.modify_ack_deadline(items)
560+
548561
invoke_callbacks_for = []
549562

550563
for received_message in response.received_messages:
551564
message = google.cloud.pubsub_v1.subscriber.message.Message(
552565
received_message.message, received_message.ack_id, self._scheduler.queue
553566
)
554-
if self.load < _MAX_LOAD:
555-
req = requests.LeaseRequest(
556-
ack_id=message.ack_id, byte_size=message.size
557-
)
558-
self.leaser.add([req])
559-
invoke_callbacks_for.append(message)
560-
self.maybe_pause_consumer()
561-
else:
562-
self._messages_on_hold.put(message)
563-
564-
# Immediately (i.e. without waiting for the auto lease management)
565-
# modack the messages we received and not put on hold, as this tells
566-
# the server that we've received them.
567-
items = [
568-
requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99))
569-
for message in invoke_callbacks_for
570-
]
571-
self._dispatcher.modify_ack_deadline(items)
567+
# Making a decision based on the load, and modifying the data that
568+
# affects the load -> needs a lock, as that state can be modified
569+
# by different threads.
570+
with self._pause_resume_lock:
571+
if self.load < _MAX_LOAD:
572+
invoke_callbacks_for.append(message)
573+
else:
574+
self._messages_on_hold.put(message)
575+
self._on_hold_bytes += message.size
576+
577+
req = requests.LeaseRequest(ack_id=message.ack_id, byte_size=message.size)
578+
self.leaser.add([req])
579+
self.maybe_pause_consumer()
572580

573581
_LOGGER.debug(
574-
"Scheduling callbacks for %s new messages, new total on hold %s.",
582+
"Scheduling callbacks for %s new messages, new total on hold %s (bytes %s).",
575583
len(invoke_callbacks_for),
576584
self._messages_on_hold.qsize(),
585+
self._on_hold_bytes,
577586
)
578587
for msg in invoke_callbacks_for:
579588
self._scheduler.schedule(self._callback, msg)

pubsub/tests/system.py

+6-10
Original file line numberDiff line numberDiff line change
@@ -382,10 +382,6 @@ class CallbackError(Exception):
382382
with pytest.raises(CallbackError):
383383
future.result(timeout=30)
384384

385-
@pytest.mark.xfail(
386-
reason="The default stream ACK deadline is static and received messages "
387-
"exceeding FlowControl.max_messages are currently not lease managed."
388-
)
389385
def test_streaming_pull_ack_deadline(
390386
self, publisher, subscriber, project, topic_path, subscription_path, cleanup
391387
):
@@ -400,29 +396,29 @@ def test_streaming_pull_ack_deadline(
400396
# Subscribe to the topic. This must happen before the messages
401397
# are published.
402398
subscriber.create_subscription(
403-
subscription_path, topic_path, ack_deadline_seconds=240
399+
subscription_path, topic_path, ack_deadline_seconds=45
404400
)
405401

406402
# publish some messages and wait for completion
407403
self._publish_messages(publisher, topic_path, batch_sizes=[2])
408404

409405
# subscribe to the topic
410406
callback = StreamingPullCallback(
411-
processing_time=70, # more than the default stream ACK deadline (60s)
407+
processing_time=13, # more than the default stream ACK deadline (10s)
412408
resolve_at_msg_count=3, # one more than the published messages count
413409
)
414410
flow_control = types.FlowControl(max_messages=1)
415411
subscription_future = subscriber.subscribe(
416412
subscription_path, callback, flow_control=flow_control
417413
)
418414

419-
# We expect to process the first two messages in 2 * 70 seconds, and
415+
# We expect to process the first two messages in 2 * 13 seconds, and
420416
# any duplicate message that is re-sent by the backend in additional
421-
# 70 seconds, totalling 210 seconds (+ overhead) --> if there have been
422-
# no duplicates in 240 seconds, we can reasonably assume that there
417+
# 13 seconds, totalling 39 seconds (+ overhead) --> if there have been
418+
# no duplicates in 60 seconds, we can reasonably assume that there
423419
# won't be any.
424420
try:
425-
callback.done_future.result(timeout=240)
421+
callback.done_future.result(timeout=60)
426422
except exceptions.TimeoutError:
427423
# future timed out, because we received no excessive messages
428424
assert sorted(callback.seen_message_ids) == [1, 2]

pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

+51-24
Original file line numberDiff line numberDiff line change
@@ -233,13 +233,15 @@ def test__maybe_release_messages_on_overload():
233233
manager = make_manager(
234234
flow_control=types.FlowControl(max_messages=10, max_bytes=1000)
235235
)
236-
# Ensure load is exactly 1.0 (to verify that >= condition is used)
237-
_leaser = manager._leaser = mock.create_autospec(leaser.Leaser)
238-
_leaser.message_count = 10
239-
_leaser.bytes = 1000
240236

241237
msg = mock.create_autospec(message.Message, instance=True, ack_id="ack", size=11)
242238
manager._messages_on_hold.put(msg)
239+
manager._on_hold_bytes = msg.size
240+
241+
# Ensure load is exactly 1.0 (to verify that >= condition is used)
242+
_leaser = manager._leaser = mock.create_autospec(leaser.Leaser)
243+
_leaser.message_count = 10
244+
_leaser.bytes = 1000 + msg.size
243245

244246
manager._maybe_release_messages()
245247

@@ -254,18 +256,20 @@ def test__maybe_release_messages_below_overload():
254256
)
255257
manager._callback = mock.sentinel.callback
256258

257-
# init leaser message count to 8 to leave room for 2 more messages
259+
# Init leaser message count to 11, so that when subtracting the 3 messages
260+
# that are on hold, there is still room for another 2 messages before the
261+
# max load is hit.
258262
_leaser = manager._leaser = mock.create_autospec(leaser.Leaser)
259-
fake_leaser_add(_leaser, init_msg_count=8, assumed_msg_size=25)
260-
_leaser.add = mock.Mock(wraps=_leaser.add) # to spy on calls
263+
fake_leaser_add(_leaser, init_msg_count=11, assumed_msg_size=10)
261264

262265
messages = [
263-
mock.create_autospec(message.Message, instance=True, ack_id="ack_foo", size=11),
264-
mock.create_autospec(message.Message, instance=True, ack_id="ack_bar", size=22),
265-
mock.create_autospec(message.Message, instance=True, ack_id="ack_baz", size=33),
266+
mock.create_autospec(message.Message, instance=True, ack_id="ack_foo", size=10),
267+
mock.create_autospec(message.Message, instance=True, ack_id="ack_bar", size=10),
268+
mock.create_autospec(message.Message, instance=True, ack_id="ack_baz", size=10),
266269
]
267270
for msg in messages:
268271
manager._messages_on_hold.put(msg)
272+
manager._on_hold_bytes = 3 * 10
269273

270274
# the actual call of MUT
271275
manager._maybe_release_messages()
@@ -274,13 +278,6 @@ def test__maybe_release_messages_below_overload():
274278
msg = manager._messages_on_hold.get_nowait()
275279
assert msg.ack_id == "ack_baz"
276280

277-
assert len(_leaser.add.mock_calls) == 2
278-
expected_calls = [
279-
mock.call([requests.LeaseRequest(ack_id="ack_foo", byte_size=11)]),
280-
mock.call([requests.LeaseRequest(ack_id="ack_bar", byte_size=22)]),
281-
]
282-
_leaser.add.assert_has_calls(expected_calls)
283-
284281
schedule_calls = manager._scheduler.schedule.mock_calls
285282
assert len(schedule_calls) == 2
286283
for _, call_args, _ in schedule_calls:
@@ -289,6 +286,34 @@ def test__maybe_release_messages_below_overload():
289286
assert call_args[1].ack_id in ("ack_foo", "ack_bar")
290287

291288

289+
def test__maybe_release_messages_negative_on_hold_bytes_warning(caplog):
290+
manager = make_manager(
291+
flow_control=types.FlowControl(max_messages=10, max_bytes=1000)
292+
)
293+
294+
msg = mock.create_autospec(message.Message, instance=True, ack_id="ack", size=17)
295+
manager._messages_on_hold.put(msg)
296+
manager._on_hold_bytes = 5 # too low for some reason
297+
298+
_leaser = manager._leaser = mock.create_autospec(leaser.Leaser)
299+
_leaser.message_count = 3
300+
_leaser.bytes = 150
301+
302+
with caplog.at_level(logging.WARNING):
303+
manager._maybe_release_messages()
304+
305+
expected_warnings = [
306+
record.message.lower()
307+
for record in caplog.records
308+
if "unexpectedly negative" in record.message
309+
]
310+
assert len(expected_warnings) == 1
311+
assert "on hold bytes" in expected_warnings[0]
312+
assert "-12" in expected_warnings[0]
313+
314+
assert manager._on_hold_bytes == 0 # should be auto-corrected
315+
316+
292317
def test_send_unary():
293318
manager = make_manager()
294319
manager._UNARY_REQUESTS = True
@@ -404,8 +429,6 @@ def test_heartbeat_inactive():
404429
"google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater", autospec=True
405430
)
406431
def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc):
407-
stream_ack_deadline = streaming_pull_manager._DEFAULT_STREAM_ACK_DEADLINE
408-
409432
manager = make_manager()
410433

411434
manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)
@@ -435,7 +458,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
435458
)
436459
initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]
437460
assert initial_request_arg.func == manager._get_initial_request
438-
assert initial_request_arg.args[0] == stream_ack_deadline
461+
assert initial_request_arg.args[0] == 10 # the default stream ACK timeout
439462
assert not manager._client.api.get_subscription.called
440463

441464
resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with(
@@ -668,13 +691,17 @@ def test__on_response_with_leaser_overload():
668691
# are called in the expected way.
669692
manager._on_response(response)
670693

671-
# only the messages that are added to the lease management and dispatched to
672-
# callbacks should have their ACK deadline extended
694+
# all messages should be added to the lease management and have their ACK
695+
# deadline extended, even those not dispatched to callbacks
673696
dispatcher.modify_ack_deadline.assert_called_once_with(
674-
[requests.ModAckRequest("fack", 10)]
697+
[
698+
requests.ModAckRequest("fack", 10),
699+
requests.ModAckRequest("back", 10),
700+
requests.ModAckRequest("zack", 10),
701+
]
675702
)
676703

677-
# one message should be scheduled, the leaser capacity allows for it
704+
# one message should be scheduled, the flow control limits allow for it
678705
schedule_calls = scheduler.schedule.mock_calls
679706
assert len(schedule_calls) == 1
680707
call_args = schedule_calls[0][1]

0 commit comments

Comments
 (0)