-
-
Notifications
You must be signed in to change notification settings - Fork 357
add @as_safe_channel
#3197
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add @as_safe_channel
#3197
Changes from 29 commits
317cb1c
1a7714e
b0b8b02
8584cff
274755f
428dd4b
7542973
86d3b0f
2d11ea2
a5734f6
0b461d2
b86eb54
7936fd2
6e71d4e
1670674
7acf3a0
69a95dc
efe2d00
f78f641
5bfb0c5
54800e2
0e34b85
1b8ce0a
ec61a1f
9f8a2ab
bfa981c
26ed1c6
72ac535
e7556b4
da8a415
0030ac8
e9596a2
b076e6d
ba9c1d2
58e03c9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Add :func:`@trio.background_with_channel <trio.background_with_channel>`, a wrapper that can be used to make async generators safe. This will be the suggested fix for `ASYNC900 <https://flake8-async.readthedocs.io/en/latest/rules.html#async900>`_. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,10 @@ | ||
from __future__ import annotations | ||
|
||
import sys | ||
from collections import OrderedDict, deque | ||
from collections.abc import AsyncGenerator, Callable # noqa: TC003 # Needed for Sphinx | ||
from contextlib import AbstractAsyncContextManager, asynccontextmanager | ||
from functools import wraps | ||
from math import inf | ||
from typing import ( | ||
TYPE_CHECKING, | ||
|
@@ -14,12 +18,30 @@ | |
|
||
from ._abc import ReceiveChannel, ReceiveType, SendChannel, SendType, T | ||
from ._core import Abort, RaiseCancelT, Task, enable_ki_protection | ||
from ._util import NoPublicConstructor, final, generic_function | ||
from ._util import ( | ||
NoPublicConstructor, | ||
final, | ||
generic_function, | ||
raise_single_exception_from_group, | ||
) | ||
|
||
if sys.version_info < (3, 11): | ||
from exceptiongroup import BaseExceptionGroup | ||
|
||
if TYPE_CHECKING: | ||
from types import TracebackType | ||
|
||
from typing_extensions import Self | ||
from typing_extensions import ParamSpec, Self | ||
|
||
P = ParamSpec("P") | ||
elif "sphinx" in sys.modules: | ||
# P needs to exist for Sphinx to parse the type hints successfully. | ||
try: | ||
from typing_extensions import ParamSpec | ||
except ImportError: | ||
P = ... # This is valid in Callable, though not correct | ||
else: | ||
P = ParamSpec("P") | ||
|
||
|
||
def _open_memory_channel( | ||
|
@@ -440,3 +462,132 @@ async def aclose(self) -> None: | |
See `MemoryReceiveChannel.close`.""" | ||
self.close() | ||
await trio.lowlevel.checkpoint() | ||
|
||
|
||
class RecvChanWrapper(ReceiveChannel[T]): | ||
def __init__( | ||
self, recv_chan: MemoryReceiveChannel[T], send_semaphore: trio.Semaphore | ||
) -> None: | ||
self.recv_chan = recv_chan | ||
self.send_semaphore = send_semaphore | ||
|
||
# TODO: should this allow clones? We'd signal that by inheriting from | ||
# MemoryReceiveChannel. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest we ship an initial version without There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was mostly thinking if we have any strong reasons not to offer it, implementing it should be straightforward. |
||
|
||
async def receive(self) -> T: | ||
self.send_semaphore.release() | ||
return await self.recv_chan.receive() | ||
Zac-HD marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
async def aclose(self) -> None: | ||
await self.recv_chan.aclose() | ||
|
||
def __enter__(self) -> Self: | ||
return self | ||
|
||
def __exit__( | ||
self, | ||
exc_type: type[BaseException] | None, | ||
exc_value: BaseException | None, | ||
traceback: TracebackType | None, | ||
) -> None: | ||
self.recv_chan.close() | ||
|
||
|
||
def background_with_channel( | ||
Zac-HD marked this conversation as resolved.
Show resolved
Hide resolved
|
||
fn: Callable[P, AsyncGenerator[T, None]], | ||
) -> Callable[P, AbstractAsyncContextManager[ReceiveChannel[T]]]: | ||
"""Decorate an async generator function to make it cancellation-safe. | ||
|
||
This is mostly a drop-in replacement, except for the fact that it will | ||
wrap errors in exception groups due to the internal nursery. Although when | ||
using it without a buffer it should be exceedingly rare to get multiple | ||
exceptions. | ||
jakkdl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
The ``yield`` keyword offers a very convenient way to write iterators... | ||
which makes it really unfortunate that async generators are so difficult | ||
to call correctly. Yielding from the inside of a cancel scope or a nursery | ||
to the outside `violates structured concurrency <https://xkcd.com/292/>`_ | ||
with consequences explained in :pep:`789`. Even then, resource cleanup | ||
errors remain common (:pep:`533`) unless you wrap every call in | ||
:func:`~contextlib.aclosing`. | ||
|
||
This decorator gives you the best of both worlds: with careful exception | ||
handling and a background task we preserve structured concurrency by | ||
offering only the safe interface, and you can still write your iterables | ||
with the convenience of ``yield``. For example:: | ||
|
||
@background_with_channel | ||
async def my_async_iterable(arg, *, kwarg=True): | ||
while ...: | ||
item = await ... | ||
yield item | ||
|
||
async with my_async_iterable(...) as recv_chan: | ||
async for item in recv_chan: | ||
... | ||
|
||
While the combined async-with-async-for can be inconvenient at first, | ||
the context manager is indispensable for both correctness and for prompt | ||
cleanup of resources. | ||
|
||
If you specify ``max_buffer_size>0`` the async generator will run concurrently | ||
with your iterator, until the buffer is full. | ||
Zac-HD marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
jakkdl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Perhaps a future PEP will adopt `async with for` syntax, like | ||
# https://coconut.readthedocs.io/en/master/DOCS.html#async-with-for | ||
|
||
@asynccontextmanager | ||
@wraps(fn) | ||
async def context_manager( | ||
*args: P.args, **kwargs: P.kwargs | ||
) -> AsyncGenerator[trio._channel.RecvChanWrapper[T], None]: | ||
send_chan, recv_chan = trio.open_memory_channel[T](0) | ||
try: | ||
async with trio.open_nursery(strict_exception_groups=True) as nursery: | ||
agen = fn(*args, **kwargs) | ||
send_semaphore = trio.Semaphore(0) | ||
# `nursery.start` to make sure that we will clean up send_chan & agen | ||
# If this errors we don't close `recv_chan`, but the caller | ||
# never gets access to it, so that's not a problem. | ||
await nursery.start( | ||
_move_elems_to_channel, agen, send_chan, send_semaphore | ||
) | ||
# `async with recv_chan` could eat exceptions, so use sync cm | ||
with RecvChanWrapper(recv_chan, send_semaphore) as wrapped_recv_chan: | ||
yield wrapped_recv_chan | ||
# User has exited context manager, cancel to immediately close the | ||
# abandoned generator if it's still alive. | ||
nursery.cancel_scope.cancel() | ||
except BaseExceptionGroup as eg: | ||
try: | ||
raise_single_exception_from_group(eg) | ||
Zac-HD marked this conversation as resolved.
Show resolved
Hide resolved
|
||
except AssertionError: | ||
raise RuntimeError( | ||
"Encountered exception during cleanup of generator object, as well as exception in the contextmanager body" | ||
) from eg | ||
|
||
async def _move_elems_to_channel( | ||
agen: AsyncGenerator[T, None], | ||
send_chan: trio.MemorySendChannel[T], | ||
send_semaphore: trio.Semaphore, | ||
task_status: trio.TaskStatus, | ||
) -> None: | ||
# `async with send_chan` will eat exceptions, | ||
# see https://github.com/python-trio/trio/issues/1559 | ||
with send_chan: | ||
try: | ||
task_status.started() | ||
while True: | ||
# wait for receiver to call next on the aiter | ||
await send_semaphore.acquire() | ||
try: | ||
value = await agen.__anext__() | ||
except StopAsyncIteration: | ||
return | ||
# Send the value to the channel | ||
await send_chan.send(value) | ||
finally: | ||
# replace try-finally with contextlib.aclosing once python39 is dropped | ||
await agen.aclose() | ||
Zac-HD marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return context_manager |
Uh oh!
There was an error while loading. Please reload this page.