Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement backpressure for HTTP request body and WebSocket messages #427

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

Wyzard256
Copy link

HTTP request body chunks and WebSocket messages will be received from the underlying ASGI server only as fast as the application is able to handle them. Previously, input would be read from the client as quickly as possible, and would be buffered in memory if the application's handler didn't consume it fast enough. Now, the amount of input buffered in memory is limited to a single HTTP body chunk or WebSocket message, instead of being unbounded.

I originally intended to remove the connections' receiver_task entirely, and just call the ASGI receive function directly from handler_task, as I'd mentioned in a comment on #152. However, I realized that it's sufficient to just have receiver_task wait for space to be available in a bounded queue instead of immediately appending to unbounded storage.

This involved some changes for HTTP to make it use a queue instead of a byte array. The Body class no longer has methods to append chunks of data or signal completion, because it no longer owns the storage for the data; those methods are now on a separate queue object, and the Body just reads from the queue (via an iterator). In hindsight I could've had the body object own the queue so it could still have methods like append and set_complete, but it seems cleaner this way, and it's closer to how WebSocket is implemented.

The change for WebSocket is much simpler, because it already uses a queue, which just needed to have a size limit instead of being unbounded.

Note that if the client disconnects unexpectedly, the handler task might not be canceled immediately if there's pending input that the app's handler hasn't taken from the queue yet. Previously the receiver task was always ready to immediately get new events from the ASGI receive function so it would notice a disconnect right away, but now it has to wait for space in the queue when putting a data chunk into it, before it can get the next event from ASGI. However, backpressure means that there won't be a large amount of input queued up, so the handler task will be canceled in a timely manner once it starts consuming its input.

Fixes #152

This decouples __await__ from the details of how the data is received
from the client, by delegating those details to __anext__ instead.
Request.__init__ doesn't take an awaitable body object as a parameter;
it creates that object by itself.
Instead of ASGIHTTPConnection's receiver task appending the data to a
byte array, it now puts chunks into a queue that the request body reads
from.  The queue is limited to a single item, so the receiver task can
only put chunks into the queue as quickly as the handler task is taking
them out.  This limits the rate at which body chunks are obtained from
the ASGI receive function, and if the ASGI server also supports
pressure, this will limit the rate at which input is actually received
from the client, so that large requests won't be buffered in memory if
the client is able to upload faster than the application can consume the
data.

The Body class no longer has methods to append data or indicate
completion, because it no longer owns the storage of the data; instead,
those operations are now provided by the AsyncQueueIterator class, which
provides an iterator that the body reads from.  (For tests, there's also
a make_test_body_chunks function that produces an iterable with a
predefined sequence of chunks.)

Because the body object only streams the data now instead of storing it,
the request's get_data() method no longer expects the body object to
provide the data more than once; instead, get_data() now stores the data
on in the request object if caching is requested.
WebSocket already uses a queue for received messages, but like HTTP, the
queue needs to be limited to a single item so that messages will be
accepted from the client no faster than the application is able to use
them.
This is needed for use of typing_extensions.Self on Python 3.10, since
typing.Self doesn't exist before Python 3.11.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add streaming backpressure
1 participant