Skip to content

Commit c4acabc

Browse files
Fix timer handle churn in websocket heartbeat (#8608)
Co-authored-by: Sam Bull <[email protected]>
1 parent b2691f2 commit c4acabc

File tree

7 files changed

+318
-88
lines changed

7 files changed

+318
-88
lines changed

CHANGES/8608.misc.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Improved websocket performance when messages are sent or received frequently -- by :user:`bdraco`.
2+
3+
The WebSocket heartbeat scheduling algorithm was improved to reduce the ``asyncio`` scheduling overhead by decreasing the number of ``asyncio.TimerHandle`` creations and cancellations.

aiohttp/client_ws.py

Lines changed: 72 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from .client_exceptions import ClientError, ServerTimeoutError
99
from .client_reqrep import ClientResponse
10-
from .helpers import call_later, set_result
10+
from .helpers import calculate_timeout_when, set_result
1111
from .http import (
1212
WS_CLOSED_MESSAGE,
1313
WS_CLOSING_MESSAGE,
@@ -72,6 +72,7 @@ def __init__(
7272
self._autoping = autoping
7373
self._heartbeat = heartbeat
7474
self._heartbeat_cb: Optional[asyncio.TimerHandle] = None
75+
self._heartbeat_when: float = 0.0
7576
if heartbeat is not None:
7677
self._pong_heartbeat = heartbeat / 2.0
7778
self._pong_response_cb: Optional[asyncio.TimerHandle] = None
@@ -85,52 +86,64 @@ def __init__(
8586
self._reset_heartbeat()
8687

8788
def _cancel_heartbeat(self) -> None:
88-
if self._pong_response_cb is not None:
89-
self._pong_response_cb.cancel()
90-
self._pong_response_cb = None
91-
89+
self._cancel_pong_response_cb()
9290
if self._heartbeat_cb is not None:
9391
self._heartbeat_cb.cancel()
9492
self._heartbeat_cb = None
9593

96-
def _reset_heartbeat(self) -> None:
97-
self._cancel_heartbeat()
94+
def _cancel_pong_response_cb(self) -> None:
95+
if self._pong_response_cb is not None:
96+
self._pong_response_cb.cancel()
97+
self._pong_response_cb = None
9898

99-
if self._heartbeat is not None:
100-
self._heartbeat_cb = call_later(
101-
self._send_heartbeat,
102-
self._heartbeat,
103-
self._loop,
104-
timeout_ceil_threshold=(
105-
self._conn._connector._timeout_ceil_threshold
106-
if self._conn is not None
107-
else 5
108-
),
109-
)
99+
def _reset_heartbeat(self) -> None:
100+
if self._heartbeat is None:
101+
return
102+
self._cancel_pong_response_cb()
103+
loop = self._loop
104+
assert loop is not None
105+
conn = self._conn
106+
timeout_ceil_threshold = (
107+
conn._connector._timeout_ceil_threshold if conn is not None else 5
108+
)
109+
now = loop.time()
110+
when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold)
111+
self._heartbeat_when = when
112+
if self._heartbeat_cb is None:
113+
# We do not cancel the previous heartbeat_cb here because
114+
# it generates a significant amount of TimerHandle churn
115+
# which causes asyncio to rebuild the heap frequently.
116+
# Instead _send_heartbeat() will reschedule the next
117+
# heartbeat if it fires too early.
118+
self._heartbeat_cb = loop.call_at(when, self._send_heartbeat)
110119

111120
def _send_heartbeat(self) -> None:
112-
if self._heartbeat is not None and not self._closed:
113-
# fire-and-forget a task is not perfect but maybe ok for
114-
# sending ping. Otherwise we need a long-living heartbeat
115-
# task in the class.
116-
self._loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable]
117-
118-
if self._pong_response_cb is not None:
119-
self._pong_response_cb.cancel()
120-
self._pong_response_cb = call_later(
121-
self._pong_not_received,
122-
self._pong_heartbeat,
123-
self._loop,
124-
timeout_ceil_threshold=(
125-
self._conn._connector._timeout_ceil_threshold
126-
if self._conn is not None
127-
else 5
128-
),
121+
self._heartbeat_cb = None
122+
loop = self._loop
123+
now = loop.time()
124+
if now < self._heartbeat_when:
125+
# Heartbeat fired too early, reschedule
126+
self._heartbeat_cb = loop.call_at(
127+
self._heartbeat_when, self._send_heartbeat
129128
)
129+
return
130+
131+
# fire-and-forget a task is not perfect but maybe ok for
132+
# sending ping. Otherwise we need a long-living heartbeat
133+
# task in the class.
134+
loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable]
135+
136+
conn = self._conn
137+
timeout_ceil_threshold = (
138+
conn._connector._timeout_ceil_threshold if conn is not None else 5
139+
)
140+
when = calculate_timeout_when(now, self._pong_heartbeat, timeout_ceil_threshold)
141+
self._cancel_pong_response_cb()
142+
self._pong_response_cb = loop.call_at(when, self._pong_not_received)
130143

131144
def _pong_not_received(self) -> None:
132145
if not self._closed:
133-
self._closed = True
146+
self._set_closed()
134147
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
135148
self._exception = ServerTimeoutError()
136149
self._response.close()
@@ -139,6 +152,22 @@ def _pong_not_received(self) -> None:
139152
WSMessage(WSMsgType.ERROR, self._exception, None)
140153
)
141154

155+
def _set_closed(self) -> None:
156+
"""Set the connection to closed.
157+
158+
Cancel any heartbeat timers and set the closed flag.
159+
"""
160+
self._closed = True
161+
self._cancel_heartbeat()
162+
163+
def _set_closing(self) -> None:
164+
"""Set the connection to closing.
165+
166+
Cancel any heartbeat timers and set the closing flag.
167+
"""
168+
self._closing = True
169+
self._cancel_heartbeat()
170+
142171
@property
143172
def closed(self) -> bool:
144173
return self._closed
@@ -203,13 +232,12 @@ async def close(self, *, code: int = WSCloseCode.OK, message: bytes = b"") -> bo
203232
if self._waiting and not self._closing:
204233
assert self._loop is not None
205234
self._close_wait = self._loop.create_future()
206-
self._closing = True
235+
self._set_closing()
207236
self._reader.feed_data(WS_CLOSING_MESSAGE)
208237
await self._close_wait
209238

210239
if not self._closed:
211-
self._cancel_heartbeat()
212-
self._closed = True
240+
self._set_closed()
213241
try:
214242
await self._writer.close(code, message)
215243
except asyncio.CancelledError:
@@ -278,7 +306,8 @@ async def receive(self, timeout: Optional[float] = None) -> WSMessage:
278306
await self.close()
279307
return WSMessage(WSMsgType.CLOSED, None, None)
280308
except ClientError:
281-
self._closed = True
309+
# Likely ServerDisconnectedError when connection is lost
310+
self._set_closed()
282311
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
283312
return WS_CLOSED_MESSAGE
284313
except WebSocketError as exc:
@@ -287,19 +316,19 @@ async def receive(self, timeout: Optional[float] = None) -> WSMessage:
287316
return WSMessage(WSMsgType.ERROR, exc, None)
288317
except Exception as exc:
289318
self._exception = exc
290-
self._closing = True
319+
self._set_closing()
291320
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
292321
await self.close()
293322
return WSMessage(WSMsgType.ERROR, exc, None)
294323

295324
if msg.type is WSMsgType.CLOSE:
296-
self._closing = True
325+
self._set_closing()
297326
self._close_code = msg.data
298327
# Could be closed elsewhere while awaiting reader
299328
if not self._closed and self._autoclose: # type: ignore[redundant-expr]
300329
await self.close()
301330
elif msg.type is WSMsgType.CLOSING:
302-
self._closing = True
331+
self._set_closing()
303332
elif msg.type is WSMsgType.PING and self._autoping:
304333
await self.pong(msg.data)
305334
continue

aiohttp/helpers.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -598,12 +598,23 @@ def call_later(
598598
loop: asyncio.AbstractEventLoop,
599599
timeout_ceil_threshold: float = 5,
600600
) -> Optional[asyncio.TimerHandle]:
601-
if timeout is not None and timeout > 0:
602-
when = loop.time() + timeout
603-
if timeout > timeout_ceil_threshold:
604-
when = ceil(when)
605-
return loop.call_at(when, cb)
606-
return None
601+
if timeout is None or timeout <= 0:
602+
return None
603+
now = loop.time()
604+
when = calculate_timeout_when(now, timeout, timeout_ceil_threshold)
605+
return loop.call_at(when, cb)
606+
607+
608+
def calculate_timeout_when(
609+
loop_time: float,
610+
timeout: float,
611+
timeout_ceiling_threshold: float,
612+
) -> float:
613+
"""Calculate when to execute a timeout."""
614+
when = loop_time + timeout
615+
if timeout > timeout_ceiling_threshold:
616+
return ceil(when)
617+
return when
607618

608619

609620
class TimeoutHandle:

aiohttp/web_ws.py

Lines changed: 62 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from . import hdrs
1313
from .abc import AbstractStreamWriter
14-
from .helpers import call_later, set_exception, set_result
14+
from .helpers import calculate_timeout_when, set_exception, set_result
1515
from .http import (
1616
WS_CLOSED_MESSAGE,
1717
WS_CLOSING_MESSAGE,
@@ -74,6 +74,7 @@ class WebSocketResponse(StreamResponse):
7474
"_autoclose",
7575
"_autoping",
7676
"_heartbeat",
77+
"_heartbeat_when",
7778
"_heartbeat_cb",
7879
"_pong_heartbeat",
7980
"_pong_response_cb",
@@ -112,6 +113,7 @@ def __init__(
112113
self._autoclose = autoclose
113114
self._autoping = autoping
114115
self._heartbeat = heartbeat
116+
self._heartbeat_when = 0.0
115117
self._heartbeat_cb: Optional[asyncio.TimerHandle] = None
116118
if heartbeat is not None:
117119
self._pong_heartbeat = heartbeat / 2.0
@@ -120,57 +122,76 @@ def __init__(
120122
self._max_msg_size = max_msg_size
121123

122124
def _cancel_heartbeat(self) -> None:
123-
if self._pong_response_cb is not None:
124-
self._pong_response_cb.cancel()
125-
self._pong_response_cb = None
126-
125+
self._cancel_pong_response_cb()
127126
if self._heartbeat_cb is not None:
128127
self._heartbeat_cb.cancel()
129128
self._heartbeat_cb = None
130129

131-
def _reset_heartbeat(self) -> None:
132-
self._cancel_heartbeat()
130+
def _cancel_pong_response_cb(self) -> None:
131+
if self._pong_response_cb is not None:
132+
self._pong_response_cb.cancel()
133+
self._pong_response_cb = None
133134

134-
if self._heartbeat is not None:
135-
assert self._loop is not None
136-
self._heartbeat_cb = call_later(
137-
self._send_heartbeat,
138-
self._heartbeat,
139-
self._loop,
140-
timeout_ceil_threshold=(
141-
self._req._protocol._timeout_ceil_threshold
142-
if self._req is not None
143-
else 5
144-
),
145-
)
135+
def _reset_heartbeat(self) -> None:
136+
if self._heartbeat is None:
137+
return
138+
self._cancel_pong_response_cb()
139+
req = self._req
140+
timeout_ceil_threshold = (
141+
req._protocol._timeout_ceil_threshold if req is not None else 5
142+
)
143+
loop = self._loop
144+
assert loop is not None
145+
now = loop.time()
146+
when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold)
147+
self._heartbeat_when = when
148+
if self._heartbeat_cb is None:
149+
# We do not cancel the previous heartbeat_cb here because
150+
# it generates a significant amount of TimerHandle churn
151+
# which causes asyncio to rebuild the heap frequently.
152+
# Instead _send_heartbeat() will reschedule the next
153+
# heartbeat if it fires too early.
154+
self._heartbeat_cb = loop.call_at(when, self._send_heartbeat)
146155

147156
def _send_heartbeat(self) -> None:
148-
if self._heartbeat is not None and not self._closed:
149-
assert self._loop is not None and self._writer is not None
150-
# fire-and-forget a task is not perfect but maybe ok for
151-
# sending ping. Otherwise we need a long-living heartbeat
152-
# task in the class.
153-
self._loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable]
154-
155-
if self._pong_response_cb is not None:
156-
self._pong_response_cb.cancel()
157-
self._pong_response_cb = call_later(
158-
self._pong_not_received,
159-
self._pong_heartbeat,
160-
self._loop,
161-
timeout_ceil_threshold=(
162-
self._req._protocol._timeout_ceil_threshold
163-
if self._req is not None
164-
else 5
165-
),
157+
self._heartbeat_cb = None
158+
loop = self._loop
159+
assert loop is not None and self._writer is not None
160+
now = loop.time()
161+
if now < self._heartbeat_when:
162+
# Heartbeat fired too early, reschedule
163+
self._heartbeat_cb = loop.call_at(
164+
self._heartbeat_when, self._send_heartbeat
166165
)
166+
return
167+
168+
# fire-and-forget a task is not perfect but maybe ok for
169+
# sending ping. Otherwise we need a long-living heartbeat
170+
# task in the class.
171+
loop.create_task(self._writer.ping()) # type: ignore[unused-awaitable]
172+
173+
req = self._req
174+
timeout_ceil_threshold = (
175+
req._protocol._timeout_ceil_threshold if req is not None else 5
176+
)
177+
when = calculate_timeout_when(now, self._pong_heartbeat, timeout_ceil_threshold)
178+
self._cancel_pong_response_cb()
179+
self._pong_response_cb = loop.call_at(when, self._pong_not_received)
167180

168181
def _pong_not_received(self) -> None:
169182
if self._req is not None and self._req.transport is not None:
170-
self._closed = True
183+
self._set_closed()
171184
self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
172185
self._exception = asyncio.TimeoutError()
173186

187+
def _set_closed(self) -> None:
188+
"""Set the connection to closed.
189+
190+
Cancel any heartbeat timers and set the closed flag.
191+
"""
192+
self._closed = True
193+
self._cancel_heartbeat()
194+
174195
async def prepare(self, request: BaseRequest) -> AbstractStreamWriter:
175196
# make pre-check to don't hide it by do_handshake() exceptions
176197
if self._payload_writer is not None:
@@ -410,7 +431,7 @@ async def close(
410431
if self._closed:
411432
return False
412433

413-
self._closed = True
434+
self._set_closed()
414435
try:
415436
await self._writer.close(code, message)
416437
writer = self._payload_writer
@@ -454,6 +475,7 @@ def _set_closing(self, code: WSCloseCode) -> None:
454475
"""Set the close code and mark the connection as closing."""
455476
self._closing = True
456477
self._close_code = code
478+
self._cancel_heartbeat()
457479

458480
def _set_code_close_transport(self, code: WSCloseCode) -> None:
459481
"""Set the close code and close the transport."""
@@ -566,5 +588,6 @@ def _cancel(self, exc: BaseException) -> None:
566588
# web_protocol calls this from connection_lost
567589
# or when the server is shutting down.
568590
self._closing = True
591+
self._cancel_heartbeat()
569592
if self._reader is not None:
570593
set_exception(self._reader, exc)

0 commit comments

Comments
 (0)