Skip to content

Commit 06085c4

Browse files
authored
feat: add flow control for message publishing (googleapis#96)
* feat: add publish flow control settings * Add flow control logic to publisher client * Add flow control support for multiple add() threads * Raise publish flow control errors through futures * Include load info in debug log messages * Remove incorrect comment in a test * Remove comment about an error not directly raised * Remove redundant check for reservation exsistence * Change exception for publishing too large a message * Add internal sanity check for byte reservations * Reword the docstring on flow control limits error
1 parent cf9e87c commit 06085c4

File tree

6 files changed

+837
-23
lines changed

6 files changed

+837
-23
lines changed

google/cloud/pubsub_v1/publisher/client.py

+24-1
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@
3131
from google.cloud.pubsub_v1 import types
3232
from google.cloud.pubsub_v1.gapic import publisher_client
3333
from google.cloud.pubsub_v1.gapic.transports import publisher_grpc_transport
34+
from google.cloud.pubsub_v1.publisher import exceptions
35+
from google.cloud.pubsub_v1.publisher import futures
3436
from google.cloud.pubsub_v1.publisher._batch import thread
3537
from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer
3638
from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer
39+
from google.cloud.pubsub_v1.publisher.flow_controller import FlowController
3740

3841
__version__ = pkg_resources.get_distribution("google-cloud-pubsub").version
3942

@@ -93,7 +96,11 @@ class Client(object):
9396
9497
# Optional
9598
publisher_options = pubsub_v1.types.PublisherOptions(
96-
enable_message_ordering=False
99+
enable_message_ordering=False,
100+
flow_control=pubsub_v1.types.PublishFlowControl(
101+
message_limit=2000,
102+
limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK,
103+
),
97104
),
98105
99106
# Optional
@@ -198,6 +205,9 @@ def __init__(self, batch_settings=(), publisher_options=(), **kwargs):
198205
# Thread created to commit all sequencers after a timeout.
199206
self._commit_thread = None
200207

208+
# The object controlling the message publishing flow
209+
self._flow_controller = FlowController(self.publisher_options.flow_control)
210+
201211
@classmethod
202212
def from_service_account_file(cls, filename, batch_settings=(), **kwargs):
203213
"""Creates an instance of this client using the provided credentials
@@ -364,6 +374,18 @@ def publish(self, topic, data, ordering_key="", **attrs):
364374
data=data, ordering_key=ordering_key, attributes=attrs
365375
)
366376

377+
# Messages should go through flow control to prevent excessive
378+
# queuing on the client side (depending on the settings).
379+
try:
380+
self._flow_controller.add(message)
381+
except exceptions.FlowControlLimitError as exc:
382+
future = futures.Future()
383+
future.set_exception(exc)
384+
return future
385+
386+
def on_publish_done(future):
387+
self._flow_controller.release(message)
388+
367389
with self._batch_lock:
368390
if self._is_stopped:
369391
raise RuntimeError("Cannot publish on a stopped publisher.")
@@ -372,6 +394,7 @@ def publish(self, topic, data, ordering_key="", **attrs):
372394

373395
# Delegate the publishing to the sequencer.
374396
future = sequencer.publish(message)
397+
future.add_done_callback(on_publish_done)
375398

376399
# Create a timer thread if necessary to enforce the batching
377400
# timeout.

google/cloud/pubsub_v1/publisher/exceptions.py

+5
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,12 @@ def __init__(self, ordering_key):
3838
super(PublishToPausedOrderingKeyException, self).__init__()
3939

4040

41+
class FlowControlLimitError(Exception):
42+
"""An action resulted in exceeding the flow control limits."""
43+
44+
4145
__all__ = (
46+
"FlowControlLimitError",
4247
"MessageTooLargeError",
4348
"PublishError",
4449
"TimeoutError",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
# Copyright 2020, Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from collections import deque
16+
import logging
17+
import threading
18+
import warnings
19+
20+
from google.cloud.pubsub_v1 import types
21+
from google.cloud.pubsub_v1.publisher import exceptions
22+
23+
24+
_LOGGER = logging.getLogger(__name__)
25+
26+
27+
class _QuantityReservation(object):
28+
"""A (partial) reservation of a quantifiable resource."""
29+
30+
def __init__(self, reserved, needed):
31+
self.reserved = reserved
32+
self.needed = needed
33+
34+
35+
class FlowController(object):
36+
"""A class used to control the flow of messages passing through it.
37+
38+
Args:
39+
settings (~google.cloud.pubsub_v1.types.PublishFlowControl):
40+
Desired flow control configuration.
41+
"""
42+
43+
def __init__(self, settings):
44+
self._settings = settings
45+
46+
# Load statistics. They represent the number of messages added, but not
47+
# yet released (and their total size).
48+
self._message_count = 0
49+
self._total_bytes = 0
50+
51+
# A FIFO queue of threads blocked on adding a message, from first to last.
52+
# Only relevant if the configured limit exceeded behavior is BLOCK.
53+
self._waiting = deque()
54+
55+
# Reservations of available flow control bytes by the waiting threads.
56+
# Each value is a _QuantityReservation instance.
57+
self._byte_reservations = dict()
58+
self._reserved_bytes = 0
59+
60+
# The lock is used to protect all internal state (message and byte count,
61+
# waiting threads to add, etc.).
62+
self._operational_lock = threading.Lock()
63+
64+
# The condition for blocking the flow if capacity is exceeded.
65+
self._has_capacity = threading.Condition(lock=self._operational_lock)
66+
67+
def add(self, message):
68+
"""Add a message to flow control.
69+
70+
Adding a message updates the internal load statistics, and an action is
71+
taken if these limits are exceeded (depending on the flow control settings).
72+
73+
Args:
74+
message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`):
75+
The message entering the flow control.
76+
77+
Raises:
78+
:exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`:
79+
Raised when the desired action is
80+
:attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR` and
81+
the message would exceed flow control limits, or when the desired action
82+
is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.BLOCK` and
83+
the message would block forever against the flow control limits.
84+
"""
85+
if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE:
86+
return
87+
88+
with self._operational_lock:
89+
if not self._would_overflow(message):
90+
self._message_count += 1
91+
self._total_bytes += message.ByteSize()
92+
return
93+
94+
# Adding a message would overflow, react.
95+
if (
96+
self._settings.limit_exceeded_behavior
97+
== types.LimitExceededBehavior.ERROR
98+
):
99+
# Raising an error means rejecting a message, thus we do not
100+
# add anything to the existing load, but we do report the would-be
101+
# load if we accepted the message.
102+
load_info = self._load_info(
103+
message_count=self._message_count + 1,
104+
total_bytes=self._total_bytes + message.ByteSize(),
105+
)
106+
error_msg = "Flow control limits would be exceeded - {}.".format(
107+
load_info
108+
)
109+
raise exceptions.FlowControlLimitError(error_msg)
110+
111+
assert (
112+
self._settings.limit_exceeded_behavior
113+
== types.LimitExceededBehavior.BLOCK
114+
)
115+
116+
# Sanity check - if a message exceeds total flow control limits all
117+
# by itself, it would block forever, thus raise error.
118+
if (
119+
message.ByteSize() > self._settings.byte_limit
120+
or self._settings.message_limit < 1
121+
):
122+
load_info = self._load_info(
123+
message_count=1, total_bytes=message.ByteSize()
124+
)
125+
error_msg = (
126+
"Total flow control limits too low for the message, "
127+
"would block forever - {}.".format(load_info)
128+
)
129+
raise exceptions.FlowControlLimitError(error_msg)
130+
131+
current_thread = threading.current_thread()
132+
133+
while self._would_overflow(message):
134+
if current_thread not in self._byte_reservations:
135+
self._waiting.append(current_thread)
136+
self._byte_reservations[current_thread] = _QuantityReservation(
137+
reserved=0, needed=message.ByteSize()
138+
)
139+
140+
_LOGGER.debug(
141+
"Blocking until there is enough free capacity in the flow - "
142+
"{}.".format(self._load_info())
143+
)
144+
145+
self._has_capacity.wait()
146+
147+
_LOGGER.debug(
148+
"Woke up from waiting on free capacity in the flow - "
149+
"{}.".format(self._load_info())
150+
)
151+
152+
# Message accepted, increase the load and remove thread stats.
153+
self._message_count += 1
154+
self._total_bytes += message.ByteSize()
155+
self._reserved_bytes -= self._byte_reservations[current_thread].reserved
156+
del self._byte_reservations[current_thread]
157+
self._waiting.remove(current_thread)
158+
159+
def release(self, message):
160+
"""Release a mesage from flow control.
161+
162+
Args:
163+
message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`):
164+
The message entering the flow control.
165+
"""
166+
if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE:
167+
return
168+
169+
with self._operational_lock:
170+
# Releasing a message decreases the load.
171+
self._message_count -= 1
172+
self._total_bytes -= message.ByteSize()
173+
174+
if self._message_count < 0 or self._total_bytes < 0:
175+
warnings.warn(
176+
"Releasing a message that was never added or already released.",
177+
category=RuntimeWarning,
178+
stacklevel=2,
179+
)
180+
self._message_count = max(0, self._message_count)
181+
self._total_bytes = max(0, self._total_bytes)
182+
183+
self._distribute_available_bytes()
184+
185+
# If at least one thread waiting to add() can be unblocked, wake them up.
186+
if self._ready_to_unblock():
187+
_LOGGER.debug("Notifying threads waiting to add messages to flow.")
188+
self._has_capacity.notify_all()
189+
190+
def _distribute_available_bytes(self):
191+
"""Distribute availalbe free capacity among the waiting threads in FIFO order.
192+
193+
The method assumes that the caller has obtained ``_operational_lock``.
194+
"""
195+
available = self._settings.byte_limit - self._total_bytes - self._reserved_bytes
196+
197+
for thread in self._waiting:
198+
if available <= 0:
199+
break
200+
201+
reservation = self._byte_reservations[thread]
202+
still_needed = reservation.needed - reservation.reserved
203+
204+
# Sanity check for any internal inconsistencies.
205+
if still_needed < 0:
206+
msg = "Too many bytes reserved: {} / {}".format(
207+
reservation.reserved, reservation.needed
208+
)
209+
warnings.warn(msg, category=RuntimeWarning)
210+
still_needed = 0
211+
212+
can_give = min(still_needed, available)
213+
reservation.reserved += can_give
214+
self._reserved_bytes += can_give
215+
available -= can_give
216+
217+
def _ready_to_unblock(self):
218+
"""Determine if any of the threads waiting to add a message can proceed.
219+
220+
The method assumes that the caller has obtained ``_operational_lock``.
221+
222+
Returns:
223+
bool
224+
"""
225+
if self._waiting:
226+
# It's enough to only check the head of the queue, because FIFO
227+
# distribution of any free capacity.
228+
reservation = self._byte_reservations[self._waiting[0]]
229+
return (
230+
reservation.reserved >= reservation.needed
231+
and self._message_count < self._settings.message_limit
232+
)
233+
234+
return False
235+
236+
def _would_overflow(self, message):
237+
"""Determine if accepting a message would exceed flow control limits.
238+
239+
The method assumes that the caller has obtained ``_operational_lock``.
240+
241+
Args:
242+
message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`):
243+
The message entering the flow control.
244+
245+
Returns:
246+
bool
247+
"""
248+
reservation = self._byte_reservations.get(threading.current_thread())
249+
250+
if reservation:
251+
enough_reserved = reservation.reserved >= reservation.needed
252+
else:
253+
enough_reserved = False
254+
255+
bytes_taken = self._total_bytes + self._reserved_bytes + message.ByteSize()
256+
size_overflow = bytes_taken > self._settings.byte_limit and not enough_reserved
257+
msg_count_overflow = self._message_count + 1 > self._settings.message_limit
258+
259+
return size_overflow or msg_count_overflow
260+
261+
def _load_info(self, message_count=None, total_bytes=None, reserved_bytes=None):
262+
"""Return the current flow control load information.
263+
264+
The caller can optionally adjust some of the values to fit its reporting
265+
needs.
266+
267+
The method assumes that the caller has obtained ``_operational_lock``.
268+
269+
Args:
270+
message_count (Optional[int]):
271+
The value to override the current message count with.
272+
total_bytes (Optional[int]):
273+
The value to override the current total bytes with.
274+
reserved_bytes (Optional[int]):
275+
The value to override the current number of reserved bytes with.
276+
277+
Returns:
278+
str
279+
"""
280+
msg = "messages: {} / {}, bytes: {} / {} (reserved: {})"
281+
282+
if message_count is None:
283+
message_count = self._message_count
284+
285+
if total_bytes is None:
286+
total_bytes = self._total_bytes
287+
288+
if reserved_bytes is None:
289+
reserved_bytes = self._reserved_bytes
290+
291+
return msg.format(
292+
message_count,
293+
self._settings.message_limit,
294+
total_bytes,
295+
self._settings.byte_limit,
296+
reserved_bytes,
297+
)

0 commit comments

Comments
 (0)