Skip to content

Commit 2439560

Browse files
authored
fix(pubsub): split large (mod)ACK requests into smaller ones (#9594)
* fix(pubsub): split large (mod)ACK requests into smaller ones There is a server-side limit on the maximum size of ACK and modACK requests, which can be hit if the leaser tries to manage too many messages in a single requests. This commit assures that such large requests are split into multiple smaller requests. * Decrease max ACK batch size to 2500 The previous limit of 3000 seems to be too optimistic, and the request size limit is still hit. Reducing the batch size to 2500 fixes the problem. * Add additional test assertions about sent ACK IDs The tests should also check that each message is (MOD)ACK-ed exactly once.
1 parent d4c440b commit 2439560

File tree

2 files changed

+94
-10
lines changed

2 files changed

+94
-10
lines changed

pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py

+37-10
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@
1313
# limitations under the License.
1414

1515
from __future__ import absolute_import
16+
from __future__ import division
1617

1718
import collections
19+
import itertools
1820
import logging
21+
import math
1922
import threading
2023

2124
from google.cloud.pubsub_v1 import types
@@ -34,6 +37,18 @@
3437
"""The maximum amount of time in seconds to wait for additional request items
3538
before processing the next batch of requests."""
3639

40+
_ACK_IDS_BATCH_SIZE = 2500
41+
"""The maximum number of ACK IDs to send in a single StreamingPullRequest.
42+
43+
The backend imposes a maximum request size limit of 524288 bytes (512 KiB) per
44+
acknowledge / modifyAckDeadline request. ACK IDs have a maximum size of 164
45+
bytes, thus we cannot send more than o 524288/176 ~= 2979 ACK IDs in a single
46+
StreamingPullRequest message.
47+
48+
Accounting for some overhead, we should thus only send a maximum of 2500 ACK
49+
IDs at a time.
50+
"""
51+
3752

3853
class Dispatcher(object):
3954
def __init__(self, manager, queue):
@@ -119,9 +134,16 @@ def ack(self, items):
119134
if time_to_ack is not None:
120135
self._manager.ack_histogram.add(time_to_ack)
121136

122-
ack_ids = [item.ack_id for item in items]
123-
request = types.StreamingPullRequest(ack_ids=ack_ids)
124-
self._manager.send(request)
137+
# We must potentially split the request into multiple smaller requests
138+
# to avoid the server-side max request size limit.
139+
ack_ids = (item.ack_id for item in items)
140+
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))
141+
142+
for _ in range(total_chunks):
143+
request = types.StreamingPullRequest(
144+
ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE)
145+
)
146+
self._manager.send(request)
125147

126148
# Remove the message from lease management.
127149
self.drop(items)
@@ -150,13 +172,18 @@ def modify_ack_deadline(self, items):
150172
Args:
151173
items(Sequence[ModAckRequest]): The items to modify.
152174
"""
153-
ack_ids = [item.ack_id for item in items]
154-
seconds = [item.seconds for item in items]
155-
156-
request = types.StreamingPullRequest(
157-
modify_deadline_ack_ids=ack_ids, modify_deadline_seconds=seconds
158-
)
159-
self._manager.send(request)
175+
# We must potentially split the request into multiple smaller requests
176+
# to avoid the server-side max request size limit.
177+
ack_ids = (item.ack_id for item in items)
178+
seconds = (item.seconds for item in items)
179+
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))
180+
181+
for _ in range(total_chunks):
182+
request = types.StreamingPullRequest(
183+
modify_deadline_ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE),
184+
modify_deadline_seconds=itertools.islice(seconds, _ACK_IDS_BATCH_SIZE),
185+
)
186+
self._manager.send(request)
160187

161188
def nack(self, items):
162189
"""Explicitly deny receipt of messages.

pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py

+57
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import collections
1516
import threading
1617

1718
from google.cloud.pubsub_v1 import types
@@ -95,6 +96,34 @@ def test_ack_no_time():
9596
manager.ack_histogram.add.assert_not_called()
9697

9798

99+
def test_ack_splitting_large_payload():
100+
manager = mock.create_autospec(
101+
streaming_pull_manager.StreamingPullManager, instance=True
102+
)
103+
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)
104+
105+
items = [
106+
# use realistic lengths for ACK IDs (max 176 bytes)
107+
requests.AckRequest(ack_id=str(i).zfill(176), byte_size=0, time_to_ack=20)
108+
for i in range(5001)
109+
]
110+
dispatcher_.ack(items)
111+
112+
calls = manager.send.call_args_list
113+
assert len(calls) == 3
114+
115+
all_ack_ids = {item.ack_id for item in items}
116+
sent_ack_ids = collections.Counter()
117+
118+
for call in calls:
119+
message = call.args[0]
120+
assert message.ByteSize() <= 524288 # server-side limit (2**19)
121+
sent_ack_ids.update(message.ack_ids)
122+
123+
assert set(sent_ack_ids) == all_ack_ids # all messages should have been ACK-ed
124+
assert sent_ack_ids.most_common(1)[0][1] == 1 # each message ACK-ed exactly once
125+
126+
98127
def test_lease():
99128
manager = mock.create_autospec(
100129
streaming_pull_manager.StreamingPullManager, instance=True
@@ -153,6 +182,34 @@ def test_modify_ack_deadline():
153182
)
154183

155184

185+
def test_modify_ack_deadline_splitting_large_payload():
186+
manager = mock.create_autospec(
187+
streaming_pull_manager.StreamingPullManager, instance=True
188+
)
189+
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)
190+
191+
items = [
192+
# use realistic lengths for ACK IDs (max 176 bytes)
193+
requests.ModAckRequest(ack_id=str(i).zfill(176), seconds=60)
194+
for i in range(5001)
195+
]
196+
dispatcher_.modify_ack_deadline(items)
197+
198+
calls = manager.send.call_args_list
199+
assert len(calls) == 3
200+
201+
all_ack_ids = {item.ack_id for item in items}
202+
sent_ack_ids = collections.Counter()
203+
204+
for call in calls:
205+
message = call.args[0]
206+
assert message.ByteSize() <= 524288 # server-side limit (2**19)
207+
sent_ack_ids.update(message.modify_deadline_ack_ids)
208+
209+
assert set(sent_ack_ids) == all_ack_ids # all messages should have been MODACK-ed
210+
assert sent_ack_ids.most_common(1)[0][1] == 1 # each message MODACK-ed exactly once
211+
212+
156213
@mock.patch("threading.Thread", autospec=True)
157214
def test_start(thread):
158215
manager = mock.create_autospec(

0 commit comments

Comments
 (0)