Skip to content

Commit 80e2bde

Browse files
Allow timeout to work when reading with nowait (#5854)
(Note this depends on and extends #5853) When reading in a loop while the buffer is being constantly filled, the timeout does not work as there are no calls to `_wait()` where the timer is used. I don't know if this edge case is enough to be worried about, but have put together an initial attempt at fixing it. I'm not sure if this is really the right solution, but can atleast be used as as a discussion on ways to improve this. This can't be backported as this changes the public API (one of the functions is now async). Related #5851. --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 3187f6f commit 80e2bde

File tree

4 files changed

+37
-8
lines changed

4 files changed

+37
-8
lines changed

CHANGES/5854.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed client timeout not working when incoming data is always available without waiting -- by :user:`Dreamsorcerer`.

aiohttp/helpers.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,8 @@ def __call__(self) -> None:
676676

677677

678678
class BaseTimerContext(ContextManager["BaseTimerContext"]):
679-
pass
679+
def assert_timeout(self) -> None:
680+
"""Raise TimeoutError if timeout has been exceeded."""
680681

681682

682683
class TimerNoop(BaseTimerContext):
@@ -700,6 +701,11 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
700701
self._tasks: List[asyncio.Task[Any]] = []
701702
self._cancelled = False
702703

704+
def assert_timeout(self) -> None:
705+
"""Raise TimeoutError if timer has already been cancelled."""
706+
if self._cancelled:
707+
raise asyncio.TimeoutError from None
708+
703709
def __enter__(self) -> BaseTimerContext:
704710
task = asyncio.current_task(loop=self._loop)
705711

aiohttp/streams.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing_extensions import Final
77

88
from .base_protocol import BaseProtocol
9-
from .helpers import BaseTimerContext, set_exception, set_result
9+
from .helpers import BaseTimerContext, TimerNoop, set_exception, set_result
1010
from .log import internal_logger
1111

1212
try: # pragma: no cover
@@ -122,7 +122,7 @@ def __init__(
122122
self._waiter: Optional[asyncio.Future[None]] = None
123123
self._eof_waiter: Optional[asyncio.Future[None]] = None
124124
self._exception: Optional[BaseException] = None
125-
self._timer = timer
125+
self._timer = TimerNoop() if timer is None else timer
126126
self._eof_callbacks: List[Callable[[], None]] = []
127127

128128
def __repr__(self) -> str:
@@ -297,10 +297,7 @@ async def _wait(self, func_name: str) -> None:
297297

298298
waiter = self._waiter = self._loop.create_future()
299299
try:
300-
if self._timer:
301-
with self._timer:
302-
await waiter
303-
else:
300+
with self._timer:
304301
await waiter
305302
finally:
306303
self._waiter = None
@@ -477,8 +474,9 @@ def _read_nowait_chunk(self, n: int) -> bytes:
477474

478475
def _read_nowait(self, n: int) -> bytes:
479476
"""Read not more than n bytes, or whole buffer if n == -1"""
480-
chunks = []
477+
self._timer.assert_timeout()
481478

479+
chunks = []
482480
while self._buffer:
483481
chunk = self._read_nowait_chunk(n)
484482
chunks.append(chunk)

tests/test_client_functional.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3022,6 +3022,30 @@ async def handler(request):
30223022
await resp.read()
30233023

30243024

3025+
async def test_timeout_with_full_buffer(aiohttp_client: Any) -> None:
3026+
async def handler(request):
3027+
"""Server response that never ends and always has more data available."""
3028+
resp = web.StreamResponse()
3029+
await resp.prepare(request)
3030+
while True:
3031+
await resp.write(b"1" * 1000)
3032+
await asyncio.sleep(0.01)
3033+
3034+
async def request(client):
3035+
timeout = aiohttp.ClientTimeout(total=0.5)
3036+
async with await client.get("/", timeout=timeout) as resp:
3037+
with pytest.raises(asyncio.TimeoutError):
3038+
async for data in resp.content.iter_chunked(1):
3039+
await asyncio.sleep(0.01)
3040+
3041+
app = web.Application()
3042+
app.add_routes([web.get("/", handler)])
3043+
3044+
client = await aiohttp_client(app)
3045+
# wait_for() used just to ensure that a failing test doesn't hang.
3046+
await asyncio.wait_for(request(client), 1)
3047+
3048+
30253049
async def test_read_bufsize_session_default(aiohttp_client: Any) -> None:
30263050
async def handler(request):
30273051
return web.Response(body=b"1234567")

0 commit comments

Comments
 (0)