Skip to content

Commit 2bb22ae

Browse files
authored
fix(pubsub): set default stream ACK deadline to subscriptions' (#9268)
* feat(pubsub): set default stream ACK deadline to subscriptions' When subscribing, it makes sense to use the configured subscription's maximum ACK deadline for the streaming pull, instead of an optimistic minimum of 10 seconds. Using an optimistic deadline affects messages that are put on hold and are not lease managed, because by the time the client dispatches them to the user's callback, the optimistic ACK deadline could have already been missed, resulting in the backend unnecessary re-sending those messages, even if the subscription's ACK deadline has not been hit yet. * Rename sub_future to subscription_future
1 parent 21cf56e commit 2bb22ae

File tree

3 files changed

+95
-22
lines changed

3 files changed

+95
-22
lines changed

pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ def load(self):
208208
float: The load value.
209209
"""
210210
if self._leaser is None:
211-
return 0
211+
return 0.0
212212

213213
return max(
214214
[
@@ -384,14 +384,26 @@ def open(self, callback, on_callback_error):
384384
)
385385

386386
# Create the RPC
387+
subscription = self._client.api.get_subscription(self._subscription)
388+
stream_ack_deadline_seconds = subscription.ack_deadline_seconds
389+
390+
get_initial_request = functools.partial(
391+
self._get_initial_request, stream_ack_deadline_seconds
392+
)
387393
self._rpc = bidi.ResumableBidiRpc(
388394
start_rpc=self._client.api.streaming_pull,
389-
initial_request=self._get_initial_request,
395+
initial_request=get_initial_request,
390396
should_recover=self._should_recover,
391397
throttle_reopen=True,
392398
)
393399
self._rpc.add_done_callback(self._on_rpc_done)
394400

401+
_LOGGER.debug(
402+
"Creating a stream, default ACK deadline set to {} seconds.".format(
403+
stream_ack_deadline_seconds
404+
)
405+
)
406+
395407
# Create references to threads
396408
self._dispatcher = dispatcher.Dispatcher(self, self._scheduler.queue)
397409
self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
@@ -462,12 +474,16 @@ def close(self, reason=None):
462474
for callback in self._close_callbacks:
463475
callback(self, reason)
464476

465-
def _get_initial_request(self):
477+
def _get_initial_request(self, stream_ack_deadline_seconds):
466478
"""Return the initial request for the RPC.
467479
468480
This defines the initial request that must always be sent to Pub/Sub
469481
immediately upon opening the subscription.
470482
483+
Args:
484+
stream_ack_deadline_seconds (int):
485+
The default message acknowledge deadline for the stream.
486+
471487
Returns:
472488
google.cloud.pubsub_v1.types.StreamingPullRequest: A request
473489
suitable for being the first request on the stream (and not
@@ -486,7 +502,7 @@ def _get_initial_request(self):
486502
request = types.StreamingPullRequest(
487503
modify_deadline_ack_ids=list(lease_ids),
488504
modify_deadline_seconds=[self.ack_deadline] * len(lease_ids),
489-
stream_ack_deadline_seconds=self.ack_histogram.percentile(99),
505+
stream_ack_deadline_seconds=stream_ack_deadline_seconds,
490506
subscription=self._subscription,
491507
)
492508

@@ -511,14 +527,6 @@ def _on_response(self, response):
511527
self._messages_on_hold.qsize(),
512528
)
513529

514-
# Immediately modack the messages we received, as this tells the server
515-
# that we've received them.
516-
items = [
517-
requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99))
518-
for message in response.received_messages
519-
]
520-
self._dispatcher.modify_ack_deadline(items)
521-
522530
invoke_callbacks_for = []
523531

524532
for received_message in response.received_messages:
@@ -535,6 +543,15 @@ def _on_response(self, response):
535543
else:
536544
self._messages_on_hold.put(message)
537545

546+
# Immediately (i.e. without waiting for the auto lease management)
547+
# modack the messages we received and not put on hold, as this tells
548+
# the server that we've received them.
549+
items = [
550+
requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99))
551+
for message in invoke_callbacks_for
552+
]
553+
self._dispatcher.modify_ack_deadline(items)
554+
538555
_LOGGER.debug(
539556
"Scheduling callbacks for %s new messages, new total on hold %s.",
540557
len(invoke_callbacks_for),

pubsub/tests/system.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,55 @@ class CallbackError(Exception):
381381
with pytest.raises(CallbackError):
382382
future.result(timeout=30)
383383

384+
def test_streaming_pull_ack_deadline(
385+
self, publisher, subscriber, project, topic_path, subscription_path, cleanup
386+
):
387+
# Make sure the topic and subscription get deleted.
388+
cleanup.append((publisher.delete_topic, topic_path))
389+
cleanup.append((subscriber.delete_subscription, subscription_path))
390+
391+
# Create a topic and a subscription, then subscribe to the topic. This
392+
# must happen before the messages are published.
393+
publisher.create_topic(topic_path)
394+
395+
# Subscribe to the topic. This must happen before the messages
396+
# are published.
397+
subscriber.create_subscription(
398+
subscription_path, topic_path, ack_deadline_seconds=60
399+
)
400+
401+
# publish some messages and wait for completion
402+
self._publish_messages(publisher, topic_path, batch_sizes=[2])
403+
404+
# subscribe to the topic
405+
callback = StreamingPullCallback(
406+
processing_time=15, # more than the default ACK deadline of 10 seconds
407+
resolve_at_msg_count=3, # one more than the published messages count
408+
)
409+
flow_control = types.FlowControl(max_messages=1)
410+
subscription_future = subscriber.subscribe(
411+
subscription_path, callback, flow_control=flow_control
412+
)
413+
414+
# We expect to process the first two messages in 2 * 15 seconds, and
415+
# any duplicate message that is re-sent by the backend in additional
416+
# 15 seconds, totalling 45 seconds (+ overhead) --> if there have been
417+
# no duplicates in 60 seconds, we can reasonably assume that there
418+
# won't be any.
419+
try:
420+
callback.done_future.result(timeout=60)
421+
except exceptions.TimeoutError:
422+
# future timed out, because we received no excessive messages
423+
assert sorted(callback.seen_message_ids) == [1, 2]
424+
else:
425+
pytest.fail(
426+
"Expected to receive 2 messages, but got at least {}.".format(
427+
len(callback.seen_message_ids)
428+
)
429+
)
430+
finally:
431+
subscription_future.cancel()
432+
384433
def test_streaming_pull_max_messages(
385434
self, publisher, topic_path, subscriber, subscription_path, cleanup
386435
):

pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,11 @@ def test_heartbeat_inactive():
405405
)
406406
def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc):
407407
manager = make_manager()
408+
manager._client.api.get_subscription.return_value = types.Subscription(
409+
name="projects/foo/subscriptions/bar",
410+
topic="projects/foo/topics/baz",
411+
ack_deadline_seconds=123,
412+
)
408413

409414
manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)
410415

@@ -426,10 +431,14 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
426431

427432
resumable_bidi_rpc.assert_called_once_with(
428433
start_rpc=manager._client.api.streaming_pull,
429-
initial_request=manager._get_initial_request,
434+
initial_request=mock.ANY,
430435
should_recover=manager._should_recover,
431436
throttle_reopen=True,
432437
)
438+
initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]
439+
assert initial_request_arg.func == manager._get_initial_request
440+
assert initial_request_arg.args[0] == 123
441+
433442
resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with(
434443
manager._on_rpc_done
435444
)
@@ -574,11 +583,11 @@ def test__get_initial_request():
574583
manager._leaser = mock.create_autospec(leaser.Leaser, instance=True)
575584
manager._leaser.ack_ids = ["1", "2"]
576585

577-
initial_request = manager._get_initial_request()
586+
initial_request = manager._get_initial_request(123)
578587

579588
assert isinstance(initial_request, types.StreamingPullRequest)
580589
assert initial_request.subscription == "subscription-name"
581-
assert initial_request.stream_ack_deadline_seconds == 10
590+
assert initial_request.stream_ack_deadline_seconds == 123
582591
assert initial_request.modify_deadline_ack_ids == ["1", "2"]
583592
assert initial_request.modify_deadline_seconds == [10, 10]
584593

@@ -587,11 +596,11 @@ def test__get_initial_request_wo_leaser():
587596
manager = make_manager()
588597
manager._leaser = None
589598

590-
initial_request = manager._get_initial_request()
599+
initial_request = manager._get_initial_request(123)
591600

592601
assert isinstance(initial_request, types.StreamingPullRequest)
593602
assert initial_request.subscription == "subscription-name"
594-
assert initial_request.stream_ack_deadline_seconds == 10
603+
assert initial_request.stream_ack_deadline_seconds == 123
595604
assert initial_request.modify_deadline_ack_ids == []
596605
assert initial_request.modify_deadline_seconds == []
597606

@@ -660,12 +669,10 @@ def test__on_response_with_leaser_overload():
660669
# are called in the expected way.
661670
manager._on_response(response)
662671

672+
# only the messages that are added to the lease management and dispatched to
673+
# callbacks should have their ACK deadline extended
663674
dispatcher.modify_ack_deadline.assert_called_once_with(
664-
[
665-
requests.ModAckRequest("fack", 10),
666-
requests.ModAckRequest("back", 10),
667-
requests.ModAckRequest("zack", 10),
668-
]
675+
[requests.ModAckRequest("fack", 10)]
669676
)
670677

671678
# one message should be scheduled, the leaser capacity allows for it

0 commit comments

Comments
 (0)