Implement backpressure for HTTP request body and WebSocket messages #427
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
HTTP request body chunks and WebSocket messages will be received from the underlying ASGI server only as fast as the application is able to handle them. Previously, input would be read from the client as quickly as possible, and would be buffered in memory if the application's handler didn't consume it fast enough. Now, the amount of input buffered in memory is limited to a single HTTP body chunk or WebSocket message, instead of being unbounded.
I originally intended to remove the connections'
receiver_task
entirely, and just call the ASGI receive function directly fromhandler_task
, as I'd mentioned in a comment on #152. However, I realized that it's sufficient to just havereceiver_task
wait for space to be available in a bounded queue instead of immediately appending to unbounded storage.This involved some changes for HTTP to make it use a queue instead of a byte array. The
Body
class no longer has methods to append chunks of data or signal completion, because it no longer owns the storage for the data; those methods are now on a separate queue object, and theBody
just reads from the queue (via an iterator). In hindsight I could've had the body object own the queue so it could still have methods likeappend
andset_complete
, but it seems cleaner this way, and it's closer to how WebSocket is implemented.The change for WebSocket is much simpler, because it already uses a queue, which just needed to have a size limit instead of being unbounded.
Note that if the client disconnects unexpectedly, the handler task might not be canceled immediately if there's pending input that the app's handler hasn't taken from the queue yet. Previously the receiver task was always ready to immediately get new events from the ASGI receive function so it would notice a disconnect right away, but now it has to wait for space in the queue when putting a data chunk into it, before it can get the next event from ASGI. However, backpressure means that there won't be a large amount of input queued up, so the handler task will be canceled in a timely manner once it starts consuming its input.
Fixes #152